systemprompt_cloud/checkout/
client.rs1use anyhow::{anyhow, Result};
2use axum::extract::{Path, Query, State};
3use axum::response::{Html, Json};
4use axum::routing::get;
5use axum::Router;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use systemprompt_logging::CliService;
11use tokio::sync::{oneshot, Mutex};
12
13use crate::api_client::{CheckoutEvent, ProvisioningEventType};
14use crate::constants::checkout::{CALLBACK_PORT, CALLBACK_TIMEOUT_SECS};
15use crate::CloudApiClient;
16
17#[derive(Debug, Deserialize)]
18struct CallbackParams {
19 transaction_id: Option<String>,
20 tenant_id: Option<String>,
21 status: Option<String>,
22 error: Option<String>,
23 checkout_session_id: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize)]
27struct StatusResponse {
28 status: String,
29 message: Option<String>,
30 app_url: Option<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct CheckoutCallbackResult {
35 pub transaction_id: String,
36 pub tenant_id: String,
37 pub fly_app_name: Option<String>,
38 pub needs_deploy: bool,
39}
40
41#[derive(Debug, Clone, Copy)]
42#[allow(clippy::struct_field_names)]
43pub struct CheckoutTemplates {
44 pub success_html: &'static str,
45 pub error_html: &'static str,
46 pub waiting_html: &'static str,
47}
48
49#[allow(clippy::struct_field_names)]
50struct AppState {
51 tx: Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
52 api_client: Arc<CloudApiClient>,
53 success_html: String,
54 error_html: String,
55 waiting_html: String,
56}
57
58pub async fn run_checkout_callback_flow(
59 api_client: &CloudApiClient,
60 checkout_url: &str,
61 templates: CheckoutTemplates,
62) -> Result<CheckoutCallbackResult> {
63 let (tx, rx) = oneshot::channel::<Result<CheckoutCallbackResult>>();
64 let tx = Arc::new(Mutex::new(Some(tx)));
65
66 let state = AppState {
67 tx: Arc::clone(&tx),
68 api_client: Arc::new(CloudApiClient::new(
69 api_client.api_url(),
70 api_client.token(),
71 )),
72 success_html: templates.success_html.to_string(),
73 error_html: templates.error_html.to_string(),
74 waiting_html: templates.waiting_html.to_string(),
75 };
76
77 let app = Router::new()
78 .route("/callback", get(callback_handler))
79 .route("/status/{tenant_id}", get(status_handler))
80 .with_state(Arc::new(state));
81
82 let addr = format!("127.0.0.1:{CALLBACK_PORT}");
83 let listener = tokio::net::TcpListener::bind(&addr).await?;
84
85 CliService::info(&format!(
86 "Starting checkout callback server on http://{addr}"
87 ));
88
89 CliService::info("Opening Paddle checkout in your browser...");
90 CliService::info(&format!("URL: {checkout_url}"));
91
92 if let Err(e) = open::that(checkout_url) {
93 CliService::warning(&format!("Could not open browser automatically: {e}"));
94 CliService::info("Please open this URL manually:");
95 CliService::key_value("URL", checkout_url);
96 }
97
98 CliService::info("Waiting for checkout completion...");
99 CliService::info(&format!("(timeout in {CALLBACK_TIMEOUT_SECS} seconds)"));
100
101 let server = axum::serve(listener, app);
102
103 tokio::select! {
104 result = rx => {
105 result.map_err(|_| anyhow!("Checkout cancelled"))?
106 }
107 _ = server => {
108 Err(anyhow!("Server stopped unexpectedly"))
109 }
110 () = tokio::time::sleep(Duration::from_secs(CALLBACK_TIMEOUT_SECS)) => {
111 Err(anyhow!("Checkout timed out after {CALLBACK_TIMEOUT_SECS} seconds"))
112 }
113 }
114}
115
116async fn callback_handler(
117 State(state): State<Arc<AppState>>,
118 Query(params): Query<CallbackParams>,
119) -> Html<String> {
120 if let Some(error) = ¶ms.error {
121 tracing::error!(error = %error, "Checkout error from callback");
122 send_result(&state.tx, Err(anyhow!("Checkout error: {}", error))).await;
123 return Html(state.error_html.clone());
124 }
125
126 if let (Some(transaction_id), Some(tenant_id)) =
127 (params.transaction_id.clone(), params.tenant_id.clone())
128 {
129 match params.status.as_deref() {
130 Some("completed") => {
131 let result = Ok(CheckoutCallbackResult {
132 transaction_id,
133 tenant_id: tenant_id.clone(),
134 fly_app_name: None,
135 needs_deploy: false,
136 });
137 send_result(&state.tx, result).await;
138 let html = state.success_html.replace("{{TENANT_ID}}", &tenant_id);
139 return Html(html);
140 },
141 Some(status) => {
142 send_result(&state.tx, Err(anyhow!("Checkout status: {}", status))).await;
143 return Html(state.error_html.clone());
144 },
145 None => {
146 send_result(
147 &state.tx,
148 Err(anyhow!(
149 "Checkout callback missing required 'status' parameter"
150 )),
151 )
152 .await;
153 return Html(state.error_html.clone());
154 },
155 }
156 }
157
158 if params.status.as_deref() == Some("pending") {
159 if let Some(checkout_session_id) = params.checkout_session_id.clone() {
160 CliService::info("Payment confirmed, waiting for provisioning...");
161
162 let api_client = Arc::clone(&state.api_client);
163 let tx = Arc::clone(&state.tx);
164 let transaction_id = params
165 .transaction_id
166 .clone()
167 .unwrap_or_else(|| checkout_session_id.clone());
168
169 tokio::spawn(async move {
170 match wait_for_checkout_provisioning(&api_client, &checkout_session_id).await {
171 Ok(prov_result) => {
172 let result = Ok(CheckoutCallbackResult {
173 transaction_id,
174 tenant_id: prov_result.event.tenant_id,
175 fly_app_name: prov_result.event.fly_app_name,
176 needs_deploy: prov_result.needs_deploy,
177 });
178 send_result(&tx, result).await;
179 },
180 Err(e) => {
181 send_result(&tx, Err(e)).await;
182 },
183 }
184 });
185
186 return Html(state.waiting_html.clone());
187 }
188
189 send_result(
190 &state.tx,
191 Err(anyhow!("Pending status but no checkout_session_id")),
192 )
193 .await;
194 return Html(state.error_html.clone());
195 }
196
197 send_result(
198 &state.tx,
199 Err(anyhow!("Missing transaction_id or tenant_id in callback")),
200 )
201 .await;
202 Html(state.error_html.clone())
203}
204
205async fn send_result(
206 tx: &Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
207 result: Result<CheckoutCallbackResult>,
208) {
209 if let Some(sender) = tx.lock().await.take() {
210 if sender.send(result).is_err() {
211 tracing::warn!("Checkout result receiver dropped");
212 }
213 }
214}
215
216struct CheckoutProvisioningResult {
217 event: CheckoutEvent,
218 needs_deploy: bool,
219}
220
221async fn wait_for_checkout_provisioning(
222 client: &CloudApiClient,
223 checkout_session_id: &str,
224) -> Result<CheckoutProvisioningResult> {
225 let mut stream = client.subscribe_checkout_events(checkout_session_id);
226
227 while let Some(event_result) = stream.next().await {
228 match event_result {
229 Ok(event) => {
230 if let Some(msg) = &event.message {
231 CliService::info(msg);
232 }
233
234 match event.event_type {
235 ProvisioningEventType::InfrastructureReady => {
236 return Ok(CheckoutProvisioningResult {
237 event,
238 needs_deploy: true,
239 });
240 },
241 ProvisioningEventType::TenantReady => {
242 return Ok(CheckoutProvisioningResult {
243 event,
244 needs_deploy: false,
245 });
246 },
247 ProvisioningEventType::ProvisioningFailed => {
248 return Err(anyhow!(
249 "Provisioning failed: {}",
250 event.message.as_deref().unwrap_or("Unknown error")
251 ));
252 },
253 _ => {},
254 }
255 },
256 Err(e) => {
257 return Err(anyhow!("SSE stream error: {}", e));
258 },
259 }
260 }
261
262 Err(anyhow!("SSE stream closed unexpectedly"))
263}
264
265async fn status_handler(
266 State(state): State<Arc<AppState>>,
267 Path(tenant_id): Path<String>,
268) -> Json<StatusResponse> {
269 match state.api_client.get_tenant_status(&tenant_id).await {
270 Ok(status) => Json(StatusResponse {
271 status: status.status,
272 message: status.message,
273 app_url: status.app_url,
274 }),
275 Err(e) => Json(StatusResponse {
276 status: "error".to_string(),
277 message: Some(e.to_string()),
278 app_url: None,
279 }),
280 }
281}