1use crate::api::{
4 DeployPipelineRequest, DeployPipelineResponse, PipelineListResponse, UsageResponse,
5};
6use thiserror::Error;
7
8#[derive(Debug, Error)]
9pub enum ClientError {
10 #[error("HTTP error: {0}")]
11 Http(#[from] reqwest::Error),
12
13 #[error("API error ({status}): {message}")]
14 Api { status: u16, message: String },
15}
16
17#[derive(Debug)]
19pub struct VarpulisClient {
20 client: reqwest::Client,
21 base_url: String,
22 api_key: String,
23}
24
25impl VarpulisClient {
26 pub fn new(base_url: &str, api_key: &str) -> Self {
27 Self {
28 client: reqwest::Client::new(),
29 base_url: base_url.trim_end_matches('/').to_string(),
30 api_key: api_key.to_string(),
31 }
32 }
33
34 pub async fn deploy_pipeline(
36 &self,
37 name: &str,
38 source: &str,
39 ) -> Result<DeployPipelineResponse, ClientError> {
40 let url = format!("{}/api/v1/pipelines", self.base_url);
41 let body = DeployPipelineRequest {
42 name: name.to_string(),
43 source: source.to_string(),
44 };
45 let resp = self
46 .client
47 .post(&url)
48 .header("x-api-key", &self.api_key)
49 .json(&body)
50 .send()
51 .await?;
52
53 if !resp.status().is_success() {
54 let status = resp.status().as_u16();
55 let text = resp.text().await.unwrap_or_default();
56 return Err(ClientError::Api {
57 status,
58 message: text,
59 });
60 }
61
62 Ok(resp.json().await?)
63 }
64
65 pub async fn list_pipelines(&self) -> Result<PipelineListResponse, ClientError> {
67 let url = format!("{}/api/v1/pipelines", self.base_url);
68 let resp = self
69 .client
70 .get(&url)
71 .header("x-api-key", &self.api_key)
72 .send()
73 .await?;
74
75 if !resp.status().is_success() {
76 let status = resp.status().as_u16();
77 let text = resp.text().await.unwrap_or_default();
78 return Err(ClientError::Api {
79 status,
80 message: text,
81 });
82 }
83
84 Ok(resp.json().await?)
85 }
86
87 pub async fn delete_pipeline(&self, pipeline_id: &str) -> Result<(), ClientError> {
89 let url = format!("{}/api/v1/pipelines/{}", self.base_url, pipeline_id);
90 let resp = self
91 .client
92 .delete(&url)
93 .header("x-api-key", &self.api_key)
94 .send()
95 .await?;
96
97 if !resp.status().is_success() {
98 let status = resp.status().as_u16();
99 let text = resp.text().await.unwrap_or_default();
100 return Err(ClientError::Api {
101 status,
102 message: text,
103 });
104 }
105
106 Ok(())
107 }
108
109 pub fn logs_url(&self, pipeline_id: &str) -> String {
112 format!("{}/api/v1/pipelines/{}/logs", self.base_url, pipeline_id)
113 }
114
115 pub fn api_key(&self) -> &str {
117 &self.api_key
118 }
119
120 pub async fn get_usage(&self) -> Result<UsageResponse, ClientError> {
122 let url = format!("{}/api/v1/usage", self.base_url);
123 let resp = self
124 .client
125 .get(&url)
126 .header("x-api-key", &self.api_key)
127 .send()
128 .await?;
129
130 if !resp.status().is_success() {
131 let status = resp.status().as_u16();
132 let text = resp.text().await.unwrap_or_default();
133 return Err(ClientError::Api {
134 status,
135 message: text,
136 });
137 }
138
139 Ok(resp.json().await?)
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::api;
147 use std::sync::Arc;
148 use tokio::sync::RwLock;
149 use varpulis_runtime::tenant::{TenantManager, TenantQuota};
150
151 async fn start_test_server() -> (String, String) {
152 let mut mgr = TenantManager::new();
153 let api_key = "test-client-key".to_string();
154 mgr.create_tenant("Test".into(), api_key.clone(), TenantQuota::default())
155 .unwrap();
156 let manager = Arc::new(RwLock::new(mgr));
157 let app = api::api_routes(manager, None, None, None);
158
159 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
160 let addr = listener.local_addr().unwrap();
161 tokio::spawn(async move {
162 axum::serve(listener, app.into_make_service())
163 .await
164 .unwrap();
165 });
166
167 let base_url = format!("http://{addr}");
168 (base_url, api_key)
169 }
170
171 #[tokio::test]
172 async fn test_client_deploy_and_list() {
173 let (base_url, api_key) = start_test_server().await;
174 let client = VarpulisClient::new(&base_url, &api_key);
175
176 let resp = client
178 .deploy_pipeline("Test Pipeline", "stream A = Events .where(x > 1)")
179 .await
180 .unwrap();
181 assert_eq!(resp.name, "Test Pipeline");
182 assert_eq!(resp.status, "running");
183
184 let list = client.list_pipelines().await.unwrap();
186 assert_eq!(list.total, 1);
187 assert_eq!(list.pipelines[0].name, "Test Pipeline");
188 }
189
190 #[tokio::test]
191 async fn test_client_delete_pipeline() {
192 let (base_url, api_key) = start_test_server().await;
193 let client = VarpulisClient::new(&base_url, &api_key);
194
195 let resp = client
196 .deploy_pipeline("ToDelete", "stream B = Events .where(y > 2)")
197 .await
198 .unwrap();
199
200 client.delete_pipeline(&resp.id).await.unwrap();
201
202 let list = client.list_pipelines().await.unwrap();
203 assert_eq!(list.total, 0);
204 }
205
206 #[tokio::test]
207 async fn test_client_get_usage() {
208 let (base_url, api_key) = start_test_server().await;
209 let client = VarpulisClient::new(&base_url, &api_key);
210
211 let usage = client.get_usage().await.unwrap();
212 assert_eq!(usage.active_pipelines, 0);
213 }
214
215 #[tokio::test]
216 async fn test_client_invalid_api_key() {
217 let (base_url, _) = start_test_server().await;
218 let client = VarpulisClient::new(&base_url, "wrong-key");
219
220 let result = client.list_pipelines().await;
221 assert!(result.is_err());
222 match result.unwrap_err() {
223 ClientError::Api { status, .. } => assert_eq!(status, 401),
224 other => panic!("Expected Api error, got: {other:?}"),
225 }
226 }
227}