1use thiserror::Error;
4
5use crate::api::{
6 DeployPipelineRequest, DeployPipelineResponse, PipelineListResponse, UsageResponse,
7};
8
9#[derive(Debug, Error)]
10pub enum ClientError {
11 #[error("HTTP error: {0}")]
12 Http(#[from] reqwest::Error),
13
14 #[error("API error ({status}): {message}")]
15 Api { status: u16, message: String },
16}
17
18#[derive(Debug)]
20pub struct VarpulisClient {
21 client: reqwest::Client,
22 base_url: String,
23 api_key: String,
24}
25
26impl VarpulisClient {
27 pub fn new(base_url: &str, api_key: &str) -> Self {
28 Self {
29 client: reqwest::Client::new(),
30 base_url: base_url.trim_end_matches('/').to_string(),
31 api_key: api_key.to_string(),
32 }
33 }
34
35 pub async fn deploy_pipeline(
37 &self,
38 name: &str,
39 source: &str,
40 ) -> Result<DeployPipelineResponse, ClientError> {
41 let url = format!("{}/api/v1/pipelines", self.base_url);
42 let body = DeployPipelineRequest {
43 name: name.to_string(),
44 source: source.to_string(),
45 };
46 let resp = self
47 .client
48 .post(&url)
49 .header("x-api-key", &self.api_key)
50 .json(&body)
51 .send()
52 .await?;
53
54 if !resp.status().is_success() {
55 let status = resp.status().as_u16();
56 let text = resp.text().await.unwrap_or_default();
57 return Err(ClientError::Api {
58 status,
59 message: text,
60 });
61 }
62
63 Ok(resp.json().await?)
64 }
65
66 pub async fn list_pipelines(&self) -> Result<PipelineListResponse, ClientError> {
68 let url = format!("{}/api/v1/pipelines", self.base_url);
69 let resp = self
70 .client
71 .get(&url)
72 .header("x-api-key", &self.api_key)
73 .send()
74 .await?;
75
76 if !resp.status().is_success() {
77 let status = resp.status().as_u16();
78 let text = resp.text().await.unwrap_or_default();
79 return Err(ClientError::Api {
80 status,
81 message: text,
82 });
83 }
84
85 Ok(resp.json().await?)
86 }
87
88 pub async fn delete_pipeline(&self, pipeline_id: &str) -> Result<(), ClientError> {
90 let url = format!("{}/api/v1/pipelines/{}", self.base_url, pipeline_id);
91 let resp = self
92 .client
93 .delete(&url)
94 .header("x-api-key", &self.api_key)
95 .send()
96 .await?;
97
98 if !resp.status().is_success() {
99 let status = resp.status().as_u16();
100 let text = resp.text().await.unwrap_or_default();
101 return Err(ClientError::Api {
102 status,
103 message: text,
104 });
105 }
106
107 Ok(())
108 }
109
110 pub fn logs_url(&self, pipeline_id: &str) -> String {
113 format!("{}/api/v1/pipelines/{}/logs", self.base_url, pipeline_id)
114 }
115
116 pub fn api_key(&self) -> &str {
118 &self.api_key
119 }
120
121 pub async fn get_usage(&self) -> Result<UsageResponse, ClientError> {
123 let url = format!("{}/api/v1/usage", self.base_url);
124 let resp = self
125 .client
126 .get(&url)
127 .header("x-api-key", &self.api_key)
128 .send()
129 .await?;
130
131 if !resp.status().is_success() {
132 let status = resp.status().as_u16();
133 let text = resp.text().await.unwrap_or_default();
134 return Err(ClientError::Api {
135 status,
136 message: text,
137 });
138 }
139
140 Ok(resp.json().await?)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use std::sync::Arc;
147
148 use tokio::sync::RwLock;
149 use varpulis_runtime::tenant::{TenantManager, TenantQuota};
150
151 use super::*;
152 use crate::api;
153
154 async fn start_test_server() -> (String, String) {
155 let mut mgr = TenantManager::new();
156 let api_key = "test-client-key".to_string();
157 mgr.create_tenant("Test".into(), api_key.clone(), TenantQuota::default())
158 .unwrap();
159 let manager = Arc::new(RwLock::new(mgr));
160 let app = api::api_routes(
161 manager,
162 None,
163 None,
164 None,
165 #[cfg(feature = "saas")]
166 None,
167 );
168
169 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
170 let addr = listener.local_addr().unwrap();
171 tokio::spawn(async move {
172 axum::serve(listener, app.into_make_service())
173 .await
174 .unwrap();
175 });
176
177 let base_url = format!("http://{addr}");
178 (base_url, api_key)
179 }
180
181 #[tokio::test]
182 async fn test_client_deploy_and_list() {
183 let (base_url, api_key) = start_test_server().await;
184 let client = VarpulisClient::new(&base_url, &api_key);
185
186 let resp = client
188 .deploy_pipeline("Test Pipeline", "stream A = Events .where(x > 1)")
189 .await
190 .unwrap();
191 assert_eq!(resp.name, "Test Pipeline");
192 assert_eq!(resp.status, "running");
193
194 let list = client.list_pipelines().await.unwrap();
196 assert_eq!(list.total, 1);
197 assert_eq!(list.pipelines[0].name, "Test Pipeline");
198 }
199
200 #[tokio::test]
201 async fn test_client_delete_pipeline() {
202 let (base_url, api_key) = start_test_server().await;
203 let client = VarpulisClient::new(&base_url, &api_key);
204
205 let resp = client
206 .deploy_pipeline("ToDelete", "stream B = Events .where(y > 2)")
207 .await
208 .unwrap();
209
210 client.delete_pipeline(&resp.id).await.unwrap();
211
212 let list = client.list_pipelines().await.unwrap();
213 assert_eq!(list.total, 0);
214 }
215
216 #[tokio::test]
217 async fn test_client_get_usage() {
218 let (base_url, api_key) = start_test_server().await;
219 let client = VarpulisClient::new(&base_url, &api_key);
220
221 let usage = client.get_usage().await.unwrap();
222 assert_eq!(usage.active_pipelines, 0);
223 }
224
225 #[tokio::test]
226 async fn test_client_invalid_api_key() {
227 let (base_url, _) = start_test_server().await;
228 let client = VarpulisClient::new(&base_url, "wrong-key");
229
230 let result = client.list_pipelines().await;
231 assert!(result.is_err());
232 match result.unwrap_err() {
233 ClientError::Api { status, .. } => assert_eq!(status, 401),
234 other => panic!("Expected Api error, got: {other:?}"),
235 }
236 }
237}