Skip to main content

varpulis_cli/
client.rs

1//! HTTP client for interacting with a remote Varpulis server.
2
3use thiserror::Error;
4
5use crate::api::{
6    DeployPipelineRequest, DeployPipelineResponse, PipelineListResponse, UsageResponse,
7};
8
9#[derive(Debug, Error)]
10pub enum ClientError {
11    #[error("HTTP error: {0}")]
12    Http(#[from] reqwest::Error),
13
14    #[error("API error ({status}): {message}")]
15    Api { status: u16, message: String },
16}
17
18/// Client for the Varpulis REST API.
19#[derive(Debug)]
20pub struct VarpulisClient {
21    client: reqwest::Client,
22    base_url: String,
23    api_key: String,
24}
25
26impl VarpulisClient {
27    pub fn new(base_url: &str, api_key: &str) -> Self {
28        Self {
29            client: reqwest::Client::new(),
30            base_url: base_url.trim_end_matches('/').to_string(),
31            api_key: api_key.to_string(),
32        }
33    }
34
35    /// Deploy a pipeline to the remote server.
36    pub async fn deploy_pipeline(
37        &self,
38        name: &str,
39        source: &str,
40    ) -> Result<DeployPipelineResponse, ClientError> {
41        let url = format!("{}/api/v1/pipelines", self.base_url);
42        let body = DeployPipelineRequest {
43            name: name.to_string(),
44            source: source.to_string(),
45        };
46        let resp = self
47            .client
48            .post(&url)
49            .header("x-api-key", &self.api_key)
50            .json(&body)
51            .send()
52            .await?;
53
54        if !resp.status().is_success() {
55            let status = resp.status().as_u16();
56            let text = resp.text().await.unwrap_or_default();
57            return Err(ClientError::Api {
58                status,
59                message: text,
60            });
61        }
62
63        Ok(resp.json().await?)
64    }
65
66    /// List all pipelines for the authenticated tenant.
67    pub async fn list_pipelines(&self) -> Result<PipelineListResponse, ClientError> {
68        let url = format!("{}/api/v1/pipelines", self.base_url);
69        let resp = self
70            .client
71            .get(&url)
72            .header("x-api-key", &self.api_key)
73            .send()
74            .await?;
75
76        if !resp.status().is_success() {
77            let status = resp.status().as_u16();
78            let text = resp.text().await.unwrap_or_default();
79            return Err(ClientError::Api {
80                status,
81                message: text,
82            });
83        }
84
85        Ok(resp.json().await?)
86    }
87
88    /// Delete a pipeline by ID.
89    pub async fn delete_pipeline(&self, pipeline_id: &str) -> Result<(), ClientError> {
90        let url = format!("{}/api/v1/pipelines/{}", self.base_url, pipeline_id);
91        let resp = self
92            .client
93            .delete(&url)
94            .header("x-api-key", &self.api_key)
95            .send()
96            .await?;
97
98        if !resp.status().is_success() {
99            let status = resp.status().as_u16();
100            let text = resp.text().await.unwrap_or_default();
101            return Err(ClientError::Api {
102                status,
103                message: text,
104            });
105        }
106
107        Ok(())
108    }
109
110    /// Stream logs for a pipeline via SSE.
111    /// Returns the URL for SSE connection (caller handles streaming).
112    pub fn logs_url(&self, pipeline_id: &str) -> String {
113        format!("{}/api/v1/pipelines/{}/logs", self.base_url, pipeline_id)
114    }
115
116    /// Get the API key for SSE header authentication.
117    pub fn api_key(&self) -> &str {
118        &self.api_key
119    }
120
121    /// Get usage statistics for the authenticated tenant.
122    pub async fn get_usage(&self) -> Result<UsageResponse, ClientError> {
123        let url = format!("{}/api/v1/usage", self.base_url);
124        let resp = self
125            .client
126            .get(&url)
127            .header("x-api-key", &self.api_key)
128            .send()
129            .await?;
130
131        if !resp.status().is_success() {
132            let status = resp.status().as_u16();
133            let text = resp.text().await.unwrap_or_default();
134            return Err(ClientError::Api {
135                status,
136                message: text,
137            });
138        }
139
140        Ok(resp.json().await?)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use std::sync::Arc;
147
148    use tokio::sync::RwLock;
149    use varpulis_runtime::tenant::{TenantManager, TenantQuota};
150
151    use super::*;
152    use crate::api;
153
154    async fn start_test_server() -> (String, String) {
155        let mut mgr = TenantManager::new();
156        let api_key = "test-client-key".to_string();
157        mgr.create_tenant("Test".into(), api_key.clone(), TenantQuota::default())
158            .unwrap();
159        let manager = Arc::new(RwLock::new(mgr));
160        let app = api::api_routes(
161            manager,
162            None,
163            None,
164            None,
165            #[cfg(feature = "saas")]
166            None,
167        );
168
169        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
170        let addr = listener.local_addr().unwrap();
171        tokio::spawn(async move {
172            axum::serve(listener, app.into_make_service())
173                .await
174                .unwrap();
175        });
176
177        let base_url = format!("http://{addr}");
178        (base_url, api_key)
179    }
180
181    #[tokio::test]
182    async fn test_client_deploy_and_list() {
183        let (base_url, api_key) = start_test_server().await;
184        let client = VarpulisClient::new(&base_url, &api_key);
185
186        // Deploy
187        let resp = client
188            .deploy_pipeline("Test Pipeline", "stream A = Events .where(x > 1)")
189            .await
190            .unwrap();
191        assert_eq!(resp.name, "Test Pipeline");
192        assert_eq!(resp.status, "running");
193
194        // List
195        let list = client.list_pipelines().await.unwrap();
196        assert_eq!(list.total, 1);
197        assert_eq!(list.pipelines[0].name, "Test Pipeline");
198    }
199
200    #[tokio::test]
201    async fn test_client_delete_pipeline() {
202        let (base_url, api_key) = start_test_server().await;
203        let client = VarpulisClient::new(&base_url, &api_key);
204
205        let resp = client
206            .deploy_pipeline("ToDelete", "stream B = Events .where(y > 2)")
207            .await
208            .unwrap();
209
210        client.delete_pipeline(&resp.id).await.unwrap();
211
212        let list = client.list_pipelines().await.unwrap();
213        assert_eq!(list.total, 0);
214    }
215
216    #[tokio::test]
217    async fn test_client_get_usage() {
218        let (base_url, api_key) = start_test_server().await;
219        let client = VarpulisClient::new(&base_url, &api_key);
220
221        let usage = client.get_usage().await.unwrap();
222        assert_eq!(usage.active_pipelines, 0);
223    }
224
225    #[tokio::test]
226    async fn test_client_invalid_api_key() {
227        let (base_url, _) = start_test_server().await;
228        let client = VarpulisClient::new(&base_url, "wrong-key");
229
230        let result = client.list_pipelines().await;
231        assert!(result.is_err());
232        match result.unwrap_err() {
233            ClientError::Api { status, .. } => assert_eq!(status, 401),
234            other => panic!("Expected Api error, got: {other:?}"),
235        }
236    }
237}