Skip to main content

varpulis_cli/
client.rs

1//! HTTP client for interacting with a remote Varpulis server.
2
3use crate::api::{
4    DeployPipelineRequest, DeployPipelineResponse, PipelineListResponse, UsageResponse,
5};
6use thiserror::Error;
7
8#[derive(Debug, Error)]
9pub enum ClientError {
10    #[error("HTTP error: {0}")]
11    Http(#[from] reqwest::Error),
12
13    #[error("API error ({status}): {message}")]
14    Api { status: u16, message: String },
15}
16
17/// Client for the Varpulis REST API.
18#[derive(Debug)]
19pub struct VarpulisClient {
20    client: reqwest::Client,
21    base_url: String,
22    api_key: String,
23}
24
25impl VarpulisClient {
26    pub fn new(base_url: &str, api_key: &str) -> Self {
27        Self {
28            client: reqwest::Client::new(),
29            base_url: base_url.trim_end_matches('/').to_string(),
30            api_key: api_key.to_string(),
31        }
32    }
33
34    /// Deploy a pipeline to the remote server.
35    pub async fn deploy_pipeline(
36        &self,
37        name: &str,
38        source: &str,
39    ) -> Result<DeployPipelineResponse, ClientError> {
40        let url = format!("{}/api/v1/pipelines", self.base_url);
41        let body = DeployPipelineRequest {
42            name: name.to_string(),
43            source: source.to_string(),
44        };
45        let resp = self
46            .client
47            .post(&url)
48            .header("x-api-key", &self.api_key)
49            .json(&body)
50            .send()
51            .await?;
52
53        if !resp.status().is_success() {
54            let status = resp.status().as_u16();
55            let text = resp.text().await.unwrap_or_default();
56            return Err(ClientError::Api {
57                status,
58                message: text,
59            });
60        }
61
62        Ok(resp.json().await?)
63    }
64
65    /// List all pipelines for the authenticated tenant.
66    pub async fn list_pipelines(&self) -> Result<PipelineListResponse, ClientError> {
67        let url = format!("{}/api/v1/pipelines", self.base_url);
68        let resp = self
69            .client
70            .get(&url)
71            .header("x-api-key", &self.api_key)
72            .send()
73            .await?;
74
75        if !resp.status().is_success() {
76            let status = resp.status().as_u16();
77            let text = resp.text().await.unwrap_or_default();
78            return Err(ClientError::Api {
79                status,
80                message: text,
81            });
82        }
83
84        Ok(resp.json().await?)
85    }
86
87    /// Delete a pipeline by ID.
88    pub async fn delete_pipeline(&self, pipeline_id: &str) -> Result<(), ClientError> {
89        let url = format!("{}/api/v1/pipelines/{}", self.base_url, pipeline_id);
90        let resp = self
91            .client
92            .delete(&url)
93            .header("x-api-key", &self.api_key)
94            .send()
95            .await?;
96
97        if !resp.status().is_success() {
98            let status = resp.status().as_u16();
99            let text = resp.text().await.unwrap_or_default();
100            return Err(ClientError::Api {
101                status,
102                message: text,
103            });
104        }
105
106        Ok(())
107    }
108
109    /// Stream logs for a pipeline via SSE.
110    /// Returns the URL for SSE connection (caller handles streaming).
111    pub fn logs_url(&self, pipeline_id: &str) -> String {
112        format!("{}/api/v1/pipelines/{}/logs", self.base_url, pipeline_id)
113    }
114
115    /// Get the API key for SSE header authentication.
116    pub fn api_key(&self) -> &str {
117        &self.api_key
118    }
119
120    /// Get usage statistics for the authenticated tenant.
121    pub async fn get_usage(&self) -> Result<UsageResponse, ClientError> {
122        let url = format!("{}/api/v1/usage", self.base_url);
123        let resp = self
124            .client
125            .get(&url)
126            .header("x-api-key", &self.api_key)
127            .send()
128            .await?;
129
130        if !resp.status().is_success() {
131            let status = resp.status().as_u16();
132            let text = resp.text().await.unwrap_or_default();
133            return Err(ClientError::Api {
134                status,
135                message: text,
136            });
137        }
138
139        Ok(resp.json().await?)
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::api;
147    use std::sync::Arc;
148    use tokio::sync::RwLock;
149    use varpulis_runtime::tenant::{TenantManager, TenantQuota};
150
151    async fn start_test_server() -> (String, String) {
152        let mut mgr = TenantManager::new();
153        let api_key = "test-client-key".to_string();
154        mgr.create_tenant("Test".into(), api_key.clone(), TenantQuota::default())
155            .unwrap();
156        let manager = Arc::new(RwLock::new(mgr));
157        let app = api::api_routes(manager, None, None, None);
158
159        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
160        let addr = listener.local_addr().unwrap();
161        tokio::spawn(async move {
162            axum::serve(listener, app.into_make_service())
163                .await
164                .unwrap();
165        });
166
167        let base_url = format!("http://{addr}");
168        (base_url, api_key)
169    }
170
171    #[tokio::test]
172    async fn test_client_deploy_and_list() {
173        let (base_url, api_key) = start_test_server().await;
174        let client = VarpulisClient::new(&base_url, &api_key);
175
176        // Deploy
177        let resp = client
178            .deploy_pipeline("Test Pipeline", "stream A = Events .where(x > 1)")
179            .await
180            .unwrap();
181        assert_eq!(resp.name, "Test Pipeline");
182        assert_eq!(resp.status, "running");
183
184        // List
185        let list = client.list_pipelines().await.unwrap();
186        assert_eq!(list.total, 1);
187        assert_eq!(list.pipelines[0].name, "Test Pipeline");
188    }
189
190    #[tokio::test]
191    async fn test_client_delete_pipeline() {
192        let (base_url, api_key) = start_test_server().await;
193        let client = VarpulisClient::new(&base_url, &api_key);
194
195        let resp = client
196            .deploy_pipeline("ToDelete", "stream B = Events .where(y > 2)")
197            .await
198            .unwrap();
199
200        client.delete_pipeline(&resp.id).await.unwrap();
201
202        let list = client.list_pipelines().await.unwrap();
203        assert_eq!(list.total, 0);
204    }
205
206    #[tokio::test]
207    async fn test_client_get_usage() {
208        let (base_url, api_key) = start_test_server().await;
209        let client = VarpulisClient::new(&base_url, &api_key);
210
211        let usage = client.get_usage().await.unwrap();
212        assert_eq!(usage.active_pipelines, 0);
213    }
214
215    #[tokio::test]
216    async fn test_client_invalid_api_key() {
217        let (base_url, _) = start_test_server().await;
218        let client = VarpulisClient::new(&base_url, "wrong-key");
219
220        let result = client.list_pipelines().await;
221        assert!(result.is_err());
222        match result.unwrap_err() {
223            ClientError::Api { status, .. } => assert_eq!(status, 401),
224            other => panic!("Expected Api error, got: {other:?}"),
225        }
226    }
227}