scouter_client/http/
client.rs

1#![allow(clippy::useless_conversion)]
2use crate::error::ClientError;
3use pyo3::{prelude::*, IntoPyObjectExt};
4use scouter_settings::http::HTTPConfig;
5use scouter_types::contracts::{
6    DriftAlertRequest, DriftRequest, GetProfileRequest, ProfileRequest, ProfileStatusRequest,
7};
8use scouter_types::http::{RequestType, Routes};
9
10use crate::http::HTTPClient;
11use scouter_types::{
12    alert::Alert, custom::BinnedCustomMetrics, psi::BinnedPsiFeatureMetrics, spc::SpcDriftFeatures,
13    DriftProfile, DriftType, ProfileFuncs,
14};
15use std::path::PathBuf;
16use tracing::{debug, error};
17
18pub const DOWNLOAD_CHUNK_SIZE: usize = 1024 * 1024 * 5;
19
20#[derive(Debug, Clone)]
21pub struct ScouterClient {
22    client: HTTPClient,
23}
24
25impl ScouterClient {
26    pub fn new(config: Option<HTTPConfig>) -> Result<Self, ClientError> {
27        let client = HTTPClient::new(config.unwrap_or_default())?;
28
29        Ok(ScouterClient { client })
30    }
31
32    /// Insert a profile into the scouter server
33    pub fn insert_profile(&self, request: &ProfileRequest) -> Result<bool, ClientError> {
34        let response = self.client.request(
35            Routes::Profile,
36            RequestType::Post,
37            Some(serde_json::to_value(request).unwrap()),
38            None,
39            None,
40        )?;
41
42        if response.status().is_success() {
43            Ok(true)
44        } else {
45            Err(ClientError::InsertProfileError)
46        }
47    }
48
49    pub fn update_profile_status(
50        &self,
51        request: &ProfileStatusRequest,
52    ) -> Result<bool, ClientError> {
53        let response = self.client.request(
54            Routes::ProfileStatus,
55            RequestType::Put,
56            Some(serde_json::to_value(request).unwrap()),
57            None,
58            None,
59        )?;
60
61        if response.status().is_success() {
62            Ok(true)
63        } else {
64            Err(ClientError::UpdateProfileError)
65        }
66    }
67
68    pub fn get_alerts(&self, request: &DriftAlertRequest) -> Result<Vec<Alert>, ClientError> {
69        debug!("Getting alerts for: {:?}", request);
70
71        let query_string = serde_qs::to_string(request)?;
72
73        let response = self.client.request(
74            Routes::Alerts,
75            RequestType::Get,
76            None,
77            Some(query_string),
78            None,
79        )?;
80
81        // Check response status
82        if !response.status().is_success() {
83            return Err(ClientError::GetDriftAlertError);
84        }
85
86        // Parse response body
87        let body: serde_json::Value = response.json()?;
88
89        // Extract alerts from response
90        let alerts = body
91            .get("alerts")
92            .map(|alerts| {
93                serde_json::from_value::<Vec<Alert>>(alerts.clone()).inspect_err(|e| {
94                    error!(
95                        "Failed to parse drift alerts {:?}. Error: {:?}",
96                        &request, e
97                    );
98                })
99            })
100            .unwrap_or_else(|| {
101                error!("No alerts found in response");
102                Ok(Vec::new())
103            })?;
104
105        Ok(alerts)
106    }
107
108    pub fn get_drift_profile(
109        &self,
110        request: GetProfileRequest,
111    ) -> Result<DriftProfile, ClientError> {
112        let query_string = serde_qs::to_string(&request)?;
113
114        let response = self.client.request(
115            Routes::Profile,
116            RequestType::Get,
117            None,
118            Some(query_string),
119            None,
120        )?;
121
122        // Early return for error status codes
123        if !response.status().is_success() {
124            error!("Failed to get profile. Status: {:?}", response.status());
125            return Err(ClientError::GetDriftProfileError);
126        }
127
128        // Get response body
129        let body = response.bytes()?;
130
131        // Parse JSON response
132        let profile: DriftProfile = serde_json::from_slice(&body)?;
133
134        Ok(profile)
135    }
136
137    /// Check if the scouter server is healthy
138    pub fn check_service_health(&self) -> Result<bool, ClientError> {
139        let response = self
140            .client
141            .request(Routes::Healthcheck, RequestType::Get, None, None, None)
142            .inspect_err(|e| {
143                error!("Failed to check scouter health {}", e);
144            })?;
145
146        if response.status() == 200 {
147            Ok(true)
148        } else {
149            Ok(false)
150        }
151    }
152}
153
154#[pyclass(name = "ScouterClient")]
155pub struct PyScouterClient {
156    client: ScouterClient,
157}
158#[pymethods]
159impl PyScouterClient {
160    #[new]
161    #[pyo3(signature = (config=None))]
162    pub fn new(config: Option<&Bound<'_, PyAny>>) -> Result<Self, ClientError> {
163        let config = config.map_or(Ok(HTTPConfig::default()), |unwrapped| {
164            if unwrapped.is_instance_of::<HTTPConfig>() {
165                unwrapped.extract::<HTTPConfig>()
166            } else {
167                Err(ClientError::InvalidConfigTypeError.into())
168            }
169        })?;
170
171        let client = ScouterClient::new(Some(config.clone()))?;
172
173        Ok(PyScouterClient { client })
174    }
175
176    /// Insert a profile into the scouter server
177    ///
178    /// # Arguments
179    ///
180    /// * `profile` - A profile object to insert
181    ///
182    /// # Returns
183    ///
184    /// * `Ok(())` if the profile was inserted successfully
185    #[pyo3(signature = (profile, set_active=false, deactivate_others=false))]
186    pub fn register_profile(
187        &self,
188        profile: &Bound<'_, PyAny>,
189        set_active: bool,
190        deactivate_others: bool,
191    ) -> Result<bool, ClientError> {
192        let request = profile
193            .call_method0("create_profile_request")?
194            .extract::<ProfileRequest>()?;
195
196        self.client.insert_profile(&request)?;
197
198        debug!("Profile inserted successfully");
199        if set_active {
200            let name = profile
201                .getattr("config")?
202                .getattr("name")?
203                .extract::<String>()?;
204
205            let space = profile
206                .getattr("config")?
207                .getattr("space")?
208                .extract::<String>()?;
209
210            let version = profile
211                .getattr("config")?
212                .getattr("version")?
213                .extract::<String>()?;
214
215            let drift_type = profile
216                .getattr("config")?
217                .getattr("drift_type")?
218                .extract::<DriftType>()?;
219
220            let request = ProfileStatusRequest {
221                name,
222                space,
223                version,
224                active: true,
225                drift_type: Some(drift_type),
226                deactivate_others,
227            };
228
229            self.client.update_profile_status(&request)?;
230        }
231
232        Ok(true)
233    }
234
235    /// Update the status of a profile
236    ///
237    /// # Arguments
238    /// * `request` - A profile status request object
239    ///
240    /// # Returns
241    /// * `Ok(())` if the profile status was updated successfully
242    pub fn update_profile_status(
243        &self,
244        request: ProfileStatusRequest,
245    ) -> Result<bool, ClientError> {
246        self.client.update_profile_status(&request)
247    }
248
249    /// Get binned drift data from the scouter server
250    ///
251    /// # Arguments
252    ///
253    /// * `drift_request` - A drift request object
254    ///
255    /// # Returns
256    ///
257    /// * A binned drift object
258    pub fn get_binned_drift<'py>(
259        &self,
260        py: Python<'py>,
261        drift_request: DriftRequest,
262    ) -> Result<Bound<'py, PyAny>, ClientError> {
263        match drift_request.drift_type {
264            DriftType::Spc => {
265                PyScouterClient::get_spc_binned_drift(py, &self.client.client, drift_request)
266            }
267            DriftType::Psi => {
268                PyScouterClient::get_psi_binned_drift(py, &self.client.client, drift_request)
269            }
270            DriftType::Custom => {
271                PyScouterClient::get_custom_binned_drift(py, &self.client.client, drift_request)
272            }
273        }
274    }
275
276    pub fn get_alerts(&self, request: DriftAlertRequest) -> Result<Vec<Alert>, ClientError> {
277        debug!("Getting alerts for: {:?}", request);
278
279        let alerts = self.client.get_alerts(&request)?;
280
281        Ok(alerts)
282    }
283
284    #[pyo3(signature = (request, path))]
285    pub fn download_profile(
286        &self,
287        request: GetProfileRequest,
288        path: Option<PathBuf>,
289    ) -> Result<String, ClientError> {
290        debug!("Downloading profile: {:?}", request);
291
292        let filename = format!(
293            "{}_{}_{}_{}.json",
294            request.name, request.space, request.version, request.drift_type
295        );
296
297        let profile = self.client.get_drift_profile(request)?;
298
299        ProfileFuncs::save_to_json(profile, path.clone(), &filename)?;
300
301        Ok(path.map_or(filename, |p| p.to_string_lossy().to_string()))
302    }
303}
304
305impl PyScouterClient {
306    fn get_spc_binned_drift<'py>(
307        py: Python<'py>,
308        client: &HTTPClient,
309        drift_request: DriftRequest,
310    ) -> Result<Bound<'py, PyAny>, ClientError> {
311        let query_string = serde_qs::to_string(&drift_request)?;
312
313        let response = client.request(
314            Routes::SpcDrift,
315            RequestType::Get,
316            None,
317            Some(query_string),
318            None,
319        )?;
320
321        if response.status().is_client_error() || response.status().is_server_error() {
322            return Err(ClientError::GetDriftDataError);
323        }
324
325        let body = response.bytes()?;
326
327        let results: SpcDriftFeatures = serde_json::from_slice(&body)?;
328
329        Ok(results.into_bound_py_any(py).unwrap())
330    }
331    fn get_psi_binned_drift<'py>(
332        py: Python<'py>,
333        client: &HTTPClient,
334        drift_request: DriftRequest,
335    ) -> Result<Bound<'py, PyAny>, ClientError> {
336        let query_string = serde_qs::to_string(&drift_request)?;
337
338        let response = client.request(
339            Routes::PsiDrift,
340            RequestType::Get,
341            None,
342            Some(query_string),
343            None,
344        )?;
345
346        if response.status().is_client_error() || response.status().is_server_error() {
347            // print response text
348            error!(
349                "Failed to get PSI drift data. Status: {:?}",
350                response.status()
351            );
352            error!("Response text: {:?}", response.text());
353            return Err(ClientError::GetDriftDataError);
354        }
355
356        let body = response.bytes()?;
357
358        let results: BinnedPsiFeatureMetrics = serde_json::from_slice(&body)?;
359
360        Ok(results.into_bound_py_any(py).unwrap())
361    }
362
363    fn get_custom_binned_drift<'py>(
364        py: Python<'py>,
365        client: &HTTPClient,
366        drift_request: DriftRequest,
367    ) -> Result<Bound<'py, PyAny>, ClientError> {
368        let query_string = serde_qs::to_string(&drift_request)?;
369
370        let response = client.request(
371            Routes::CustomDrift,
372            RequestType::Get,
373            None,
374            Some(query_string),
375            None,
376        )?;
377
378        if response.status().is_client_error() || response.status().is_server_error() {
379            return Err(ClientError::GetDriftDataError);
380        }
381
382        let body = response.bytes()?;
383
384        let results: BinnedCustomMetrics = serde_json::from_slice(&body)?;
385
386        Ok(results.into_bound_py_any(py).unwrap())
387    }
388}