1use crate::error::{ProfileError, TypeError, UtilError};
2use crate::FeatureMap;
3use crate::{CommonCrons, DriftType};
4use base64::prelude::*;
5use chrono::{DateTime, Utc};
6use colored_json::{Color, ColorMode, ColoredFormatter, PrettyFormatter, Styler};
7use opentelemetry::Key;
8use opentelemetry::KeyValue;
9use opentelemetry::Value as OTelValue;
10use opentelemetry_proto::tonic::common::v1::{any_value::Value as AnyValueVariant, AnyValue};
11use pyo3::exceptions::PyRuntimeError;
12use pyo3::prelude::*;
13use pyo3::types::{PyBool, PyDict, PyFloat, PyInt, PyList, PyString};
14use pyo3::IntoPyObjectExt;
15use rayon::prelude::*;
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value};
18use std::collections::{BTreeSet, HashMap};
19use std::fmt::{Display, Formatter};
20use std::path::PathBuf;
21use std::str::FromStr;
22
23pub const MISSING: &str = "__missing__";
24pub const DEFAULT_VERSION: &str = "0.0.0";
25
26pub fn scouter_version() -> String {
27 env!("CARGO_PKG_VERSION").to_string()
28}
29
30pub enum FileName {
31 SpcDriftMap,
32 SpcDriftProfile,
33 PsiDriftMap,
34 PsiDriftProfile,
35 CustomDriftProfile,
36 DriftProfile,
37 DataProfile,
38 LLMDriftProfile,
39}
40
41impl FileName {
42 pub fn to_str(&self) -> &'static str {
43 match self {
44 FileName::SpcDriftMap => "spc_drift_map.json",
45 FileName::SpcDriftProfile => "spc_drift_profile.json",
46 FileName::PsiDriftMap => "psi_drift_map.json",
47 FileName::PsiDriftProfile => "psi_drift_profile.json",
48 FileName::CustomDriftProfile => "custom_drift_profile.json",
49 FileName::DataProfile => "data_profile.json",
50 FileName::DriftProfile => "drift_profile.json",
51 FileName::LLMDriftProfile => "llm_drift_profile.json",
52 }
53 }
54}
55
56pub struct PyHelperFuncs {}
57
58impl PyHelperFuncs {
59 pub fn __str__<T: Serialize>(object: T) -> String {
60 match ColoredFormatter::with_styler(
61 PrettyFormatter::default(),
62 Styler {
63 key: Color::Rgb(245, 77, 85).bold(),
64 string_value: Color::Rgb(249, 179, 93).foreground(),
65 float_value: Color::Rgb(249, 179, 93).foreground(),
66 integer_value: Color::Rgb(249, 179, 93).foreground(),
67 bool_value: Color::Rgb(249, 179, 93).foreground(),
68 nil_value: Color::Rgb(249, 179, 93).foreground(),
69 ..Default::default()
70 },
71 )
72 .to_colored_json(&object, ColorMode::On)
73 {
74 Ok(json) => json,
75 Err(e) => format!("Failed to serialize to json: {e}"),
76 }
77 }
79
80 pub fn __json__<T: Serialize>(object: T) -> String {
81 match serde_json::to_string_pretty(&object) {
82 Ok(json) => json,
83 Err(e) => format!("Failed to serialize to json: {e}"),
84 }
85 }
86
87 pub fn save_to_json<T>(
88 model: T,
89 path: Option<PathBuf>,
90 filename: &str,
91 ) -> Result<PathBuf, UtilError>
92 where
93 T: Serialize,
94 {
95 let json = serde_json::to_string_pretty(&model)?;
97
98 let write_path = if path.is_some() {
100 let mut new_path = path.ok_or(UtilError::CreatePathError)?;
101
102 new_path.set_extension("json");
104
105 if !new_path.exists() {
106 let parent_path = new_path.parent().ok_or(UtilError::GetParentPathError)?;
108
109 std::fs::create_dir_all(parent_path)
110 .map_err(|_| UtilError::CreateDirectoryError)?;
111 }
112
113 new_path
114 } else {
115 PathBuf::from(filename)
116 };
117
118 std::fs::write(&write_path, json)?;
119
120 Ok(write_path)
121 }
122}
123
124pub fn json_to_pyobject(py: Python, value: &Value, dict: &Bound<'_, PyDict>) -> PyResult<()> {
125 match value {
126 Value::Object(map) => {
127 for (k, v) in map {
128 let py_value = match v {
129 Value::Null => py.None(),
130 Value::Bool(b) => b.into_py_any(py).unwrap(),
131 Value::Number(n) => {
132 if let Some(i) = n.as_i64() {
133 i.into_py_any(py).unwrap()
134 } else if let Some(f) = n.as_f64() {
135 f.into_py_any(py).unwrap()
136 } else {
137 return Err(PyRuntimeError::new_err(
138 "Invalid number type, expected i64 or f64",
139 ));
140 }
141 }
142 Value::String(s) => s.into_py_any(py).unwrap(),
143 Value::Array(arr) => {
144 let py_list = PyList::empty(py);
145 for item in arr {
146 let py_item = json_to_pyobject_value(py, item)?;
147 py_list.append(py_item)?;
148 }
149 py_list.into_py_any(py).unwrap()
150 }
151 Value::Object(_) => {
152 let nested_dict = PyDict::new(py);
153 json_to_pyobject(py, v, &nested_dict)?;
154 nested_dict.into_py_any(py).unwrap()
155 }
156 };
157 dict.set_item(k, py_value)?;
158 }
159 }
160 _ => return Err(PyRuntimeError::new_err("Root must be object")),
161 }
162 Ok(())
163}
164
165pub fn json_to_pyobject_value(py: Python, value: &Value) -> PyResult<Py<PyAny>> {
166 Ok(match value {
167 Value::Null => py.None(),
168 Value::Bool(b) => b.into_py_any(py).unwrap(),
169 Value::Number(n) => {
170 if let Some(i) = n.as_i64() {
171 i.into_py_any(py).unwrap()
172 } else if let Some(f) = n.as_f64() {
173 f.into_py_any(py).unwrap()
174 } else {
175 return Err(PyRuntimeError::new_err(
176 "Invalid number type, expected i64 or f64",
177 ));
178 }
179 }
180 Value::String(s) => s.into_py_any(py).unwrap(),
181 Value::Array(arr) => {
182 let py_list = PyList::empty(py);
183 for item in arr {
184 let py_item = json_to_pyobject_value(py, item)?;
185 py_list.append(py_item)?;
186 }
187 py_list.into_py_any(py).unwrap()
188 }
189 Value::Object(_) => {
190 let nested_dict = PyDict::new(py);
191 json_to_pyobject(py, value, &nested_dict)?;
192 nested_dict.into_py_any(py).unwrap()
193 }
194 })
195}
196
197pub fn pyobject_to_json(obj: &Bound<'_, PyAny>) -> Result<Value, TypeError> {
198 if obj.is_instance_of::<PyDict>() {
199 let dict = obj.downcast::<PyDict>()?;
200 let mut map = serde_json::Map::new();
201 for (key, value) in dict.iter() {
202 let key_str = key.extract::<String>()?;
203 let json_value = pyobject_to_json(&value)?;
204 map.insert(key_str, json_value);
205 }
206 Ok(Value::Object(map))
207 } else if obj.is_instance_of::<PyList>() {
208 let list = obj.downcast::<PyList>()?;
209 let mut vec = Vec::new();
210 for item in list.iter() {
211 vec.push(pyobject_to_json(&item)?);
212 }
213 Ok(Value::Array(vec))
214 } else if obj.is_instance_of::<PyString>() {
215 let s = obj.extract::<String>()?;
216 Ok(Value::String(s))
217 } else if obj.is_instance_of::<PyFloat>() {
218 let f = obj.extract::<f64>()?;
219 Ok(json!(f))
220 } else if obj.is_instance_of::<PyBool>() {
221 let b = obj.extract::<bool>()?;
222 Ok(json!(b))
223 } else if obj.is_instance_of::<PyInt>() {
224 let i = obj.extract::<i64>()?;
225 Ok(json!(i))
226 } else if obj.is_none() {
227 Ok(Value::Null)
228 } else {
229 Err(TypeError::UnsupportedPyObjectType)
230 }
231}
232
233pub fn pyobject_to_tracing_json(
236 obj: &Bound<'_, PyAny>,
237 max_length: &usize,
238) -> Result<Value, TypeError> {
239 let py = obj.py();
241
242 if is_pydantic_basemodel(py, obj)? {
243 let dict = obj.call_method0("model_dump")?;
244 return pyobject_to_tracing_json(&dict, max_length);
245 }
246 if obj.is_instance_of::<PyDict>() {
247 let dict = obj.downcast::<PyDict>()?;
248 let mut map = serde_json::Map::new();
249 for (key, value) in dict.iter() {
250 let key = pyobject_to_tracing_json(&key, max_length)?;
251 let key_str = match key {
253 Value::String(s) => s,
254 Value::Number(n) => n.to_string(),
255 Value::Bool(b) => b.to_string(),
256 _ => return Err(TypeError::InvalidDictKeyType),
257 };
258 let json_value = pyobject_to_tracing_json(&value, max_length)?;
259 map.insert(key_str.to_string(), json_value);
260 }
261 Ok(Value::Object(map))
262 } else if obj.is_instance_of::<PyList>() {
263 let list = obj.downcast::<PyList>()?;
264 let mut vec = Vec::new();
265 for item in list.iter() {
266 vec.push(pyobject_to_tracing_json(&item, max_length)?);
267 }
268 Ok(Value::Array(vec))
269 } else if obj.is_instance_of::<PyString>() {
270 let s = obj.extract::<String>()?;
271 let truncated = if s.len() > *max_length {
272 format!("{}...[truncated]", &s[..*max_length])
273 } else {
274 s
275 };
276 Ok(Value::String(truncated))
277 } else if obj.is_instance_of::<PyFloat>() {
278 let f = obj.extract::<f64>()?;
279 Ok(json!(f))
280 } else if obj.is_instance_of::<PyBool>() {
281 let b = obj.extract::<bool>()?;
282 Ok(json!(b))
283 } else if obj.is_instance_of::<PyInt>() {
284 let i = obj.extract::<i64>()?;
285 Ok(json!(i))
286 } else if obj.is_none() {
287 Ok(Value::Null)
288 } else {
289 let ty = match obj.get_type().name() {
292 Ok(name) => name.to_string(),
293 Err(_) => "unknown".to_string(),
294 };
295
296 Ok(Value::String(ty))
297 }
298}
299
300pub fn pyobject_to_otel_value(obj: &Bound<'_, PyAny>) -> Result<OTelValue, TypeError> {
301 if obj.is_instance_of::<PyBool>() {
302 let b = obj.extract::<bool>()?;
303 Ok(OTelValue::Bool(b))
304 } else if obj.is_instance_of::<PyInt>() {
305 let i = obj.extract::<i64>()?;
306 Ok(OTelValue::I64(i))
307 } else if obj.is_instance_of::<PyFloat>() {
308 let f = obj.extract::<f64>()?;
309 Ok(OTelValue::F64(f))
310 } else if obj.is_instance_of::<PyString>() {
311 let s = obj.extract::<String>()?;
312 Ok(OTelValue::String(opentelemetry::StringValue::from(s)))
313 } else if obj.is_instance_of::<PyList>() {
314 let list = obj.downcast::<PyList>()?;
315 pylist_to_otel_array(list)
316 } else if obj.is_none() {
317 Ok(OTelValue::String(opentelemetry::StringValue::from("null")))
319 } else {
320 let json_value = pyobject_to_json(obj)?;
321 Ok(OTelValue::String(opentelemetry::StringValue::from(
322 json_value.to_string(),
323 )))
324 }
325}
326
327fn flatten_nested_dict(
328 obj: &Bound<'_, PyDict>,
329 prefix: Option<String>,
330) -> Result<Vec<KeyValue>, TypeError> {
331 let mut result = Vec::new();
332
333 for (key, value) in obj.iter() {
334 let key_str = key.extract::<String>()?;
335 let full_key = if let Some(ref p) = prefix {
336 format!("{}.{}", p, key_str)
337 } else {
338 key_str
339 };
340
341 if value.is_instance_of::<PyDict>() {
342 let nested_dict = value.downcast::<PyDict>()?;
343 result.extend(flatten_nested_dict(nested_dict, Some(full_key))?);
344 } else {
345 let otel_value = pyobject_to_otel_value(&value)?;
346 result.push(KeyValue::new(Key::new(full_key), otel_value));
347 }
348 }
349
350 Ok(result)
351}
352
353pub fn pydict_to_otel_keyvalue(obj: &Bound<'_, PyDict>) -> Result<Vec<KeyValue>, TypeError> {
354 flatten_nested_dict(obj, None)
355}
356
357fn pylist_to_otel_array(list: &Bound<'_, PyList>) -> Result<OTelValue, TypeError> {
359 if list.is_empty() {
360 return Ok(OTelValue::Array(opentelemetry::Array::String(Vec::new())));
362 }
363
364 let first_item = list.get_item(0)?;
366
367 if first_item.is_instance_of::<PyBool>() {
368 let mut bools = Vec::new();
369 for item in list.iter() {
370 if item.is_instance_of::<PyBool>() {
371 bools.push(item.extract::<bool>()?);
372 } else {
373 return pylist_to_string_array(list);
375 }
376 }
377 Ok(OTelValue::Array(opentelemetry::Array::Bool(bools)))
378 } else if first_item.is_instance_of::<PyInt>() {
379 let mut ints = Vec::new();
380 for item in list.iter() {
381 if item.is_instance_of::<PyInt>() {
382 ints.push(item.extract::<i64>()?);
383 } else {
384 return pylist_to_string_array(list);
386 }
387 }
388 Ok(OTelValue::Array(opentelemetry::Array::I64(ints)))
389 } else if first_item.is_instance_of::<PyFloat>() {
390 let mut floats = Vec::new();
391 for item in list.iter() {
392 if item.is_instance_of::<PyFloat>() {
393 floats.push(item.extract::<f64>()?);
394 } else {
395 return pylist_to_string_array(list);
397 }
398 }
399 Ok(OTelValue::Array(opentelemetry::Array::F64(floats)))
400 } else {
401 pylist_to_string_array(list)
403 }
404}
405
406fn pylist_to_string_array(list: &Bound<'_, PyList>) -> Result<OTelValue, TypeError> {
408 let mut strings = Vec::new();
409 for item in list.iter() {
410 let string_val = if item.is_instance_of::<PyString>() {
411 item.extract::<String>()?
412 } else {
413 pyobject_to_json(&item)?.to_string()
415 };
416 strings.push(opentelemetry::StringValue::from(string_val));
417 }
418 Ok(OTelValue::Array(opentelemetry::Array::String(strings)))
419}
420
421pub fn otel_value_to_serde_value(otel_value: &AnyValue) -> Value {
426 match &otel_value.value {
427 Some(variant) => match variant {
428 AnyValueVariant::BoolValue(b) => Value::Bool(*b),
429 AnyValueVariant::IntValue(i) => Value::Number(serde_json::Number::from(*i)),
430 AnyValueVariant::DoubleValue(d) => {
431 serde_json::Number::from_f64(*d)
433 .map(Value::Number)
434 .unwrap_or(Value::Null)
435 }
436 AnyValueVariant::StringValue(s) => Value::String(s.clone()),
437 AnyValueVariant::ArrayValue(array) => {
438 let values: Vec<Value> =
439 array.values.iter().map(otel_value_to_serde_value).collect();
440 Value::Array(values)
441 }
442 AnyValueVariant::KvlistValue(kvlist) => {
443 let mut map = serde_json::Map::new();
444 for kv in &kvlist.values {
445 if let Some(value) = &kv.value {
446 map.insert(kv.key.clone(), otel_value_to_serde_value(value));
447 }
448 }
449 Value::Object(map)
450 }
451 AnyValueVariant::BytesValue(bytes) => {
452 Value::String(BASE64_STANDARD.encode(bytes))
454 }
455 },
456 None => Value::Null,
457 }
458}
459
460pub fn create_feature_map(
461 features: &[String],
462 array: &[Vec<String>],
463) -> Result<FeatureMap, ProfileError> {
464 if features.len() != array.len() {
466 return Err(ProfileError::FeatureArrayLengthError);
467 };
468
469 let feature_map = array
470 .par_iter()
471 .enumerate()
472 .map(|(i, col)| {
473 let unique = col
474 .iter()
475 .collect::<BTreeSet<_>>()
476 .into_iter()
477 .collect::<Vec<_>>();
478 let mut map = HashMap::new();
479 for (j, item) in unique.iter().enumerate() {
480 map.insert(item.to_string(), j);
481
482 if j == unique.len() - 1 {
484 map.insert("missing".to_string(), j + 1);
486 }
487 }
488
489 (features[i].to_string(), map)
490 })
491 .collect::<HashMap<_, _>>();
492
493 Ok(FeatureMap {
494 features: feature_map,
495 })
496}
497
498pub fn is_pydantic_basemodel(py: Python, obj: &Bound<'_, PyAny>) -> Result<bool, TypeError> {
506 let pydantic = match py.import("pydantic") {
507 Ok(module) => module,
508 Err(_) => return Ok(false),
510 };
511
512 let basemodel = pydantic.getattr("BaseModel")?;
513
514 let is_basemodel = obj
516 .is_instance(&basemodel)
517 .map_err(|e| TypeError::FailedToCheckPydanticModel(e.to_string()))?;
518
519 Ok(is_basemodel)
520}
521
522pub fn is_pydict(obj: &Bound<'_, PyAny>) -> bool {
523 obj.is_instance_of::<PyDict>()
524}
525
526pub fn pydantic_to_value<'py>(obj: &Bound<'py, PyAny>) -> Result<Value, TypeError> {
529 let dict = obj.call_method0("model_dump")?;
530 pyobject_to_json(&dict)
531}
532
533#[derive(PartialEq, Debug)]
534pub struct ProfileArgs {
535 pub name: String,
536 pub space: String,
537 pub version: Option<String>,
538 pub schedule: String,
539 pub scouter_version: String,
540 pub drift_type: DriftType,
541}
542
543pub trait ProfileBaseArgs {
545 fn get_base_args(&self) -> ProfileArgs;
546 fn to_value(&self) -> serde_json::Value;
547}
548
549pub trait ValidateAlertConfig {
550 fn resolve_schedule(schedule: &str) -> String {
551 let default_schedule = CommonCrons::EveryDay.cron();
552
553 cron::Schedule::from_str(schedule) .map(|_| schedule) .unwrap_or_else(|_| {
556 tracing::error!("Invalid cron schedule, using default schedule");
557 &default_schedule
558 })
559 .to_string()
560 }
561}
562
563#[pyclass(eq, name = "ScouterDataType")]
564#[derive(PartialEq, Debug)]
565pub enum DataType {
566 Pandas,
567 Polars,
568 Numpy,
569 Arrow,
570 Unknown,
571 LLM,
572}
573
574impl Display for DataType {
575 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
576 match self {
577 DataType::Pandas => write!(f, "pandas"),
578 DataType::Polars => write!(f, "polars"),
579 DataType::Numpy => write!(f, "numpy"),
580 DataType::Arrow => write!(f, "arrow"),
581 DataType::Unknown => write!(f, "unknown"),
582 DataType::LLM => write!(f, "llm"),
583 }
584 }
585}
586
587impl DataType {
588 pub fn from_module_name(module_name: &str) -> Result<Self, TypeError> {
589 match module_name {
590 "pandas.core.frame.DataFrame" => Ok(DataType::Pandas),
591 "polars.dataframe.frame.DataFrame" => Ok(DataType::Polars),
592 "numpy.ndarray" => Ok(DataType::Numpy),
593 "pyarrow.lib.Table" => Ok(DataType::Arrow),
594 "scouter_drift.llm.LLMRecord" => Ok(DataType::LLM),
595 _ => Err(TypeError::InvalidDataType),
596 }
597 }
598}
599
600pub fn get_utc_datetime() -> DateTime<Utc> {
601 Utc::now()
602}
603
604#[pyclass(eq)]
605#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
606pub enum AlertThreshold {
607 Below,
608 Above,
609 Outside,
610}
611
612impl Display for AlertThreshold {
613 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
614 write!(f, "{self:?}")
615 }
616}
617
618#[pymethods]
619impl AlertThreshold {
620 #[staticmethod]
621 pub fn from_value(value: &str) -> Option<Self> {
622 match value.to_lowercase().as_str() {
623 "below" => Some(AlertThreshold::Below),
624 "above" => Some(AlertThreshold::Above),
625 "outside" => Some(AlertThreshold::Outside),
626 _ => None,
627 }
628 }
629
630 pub fn __str__(&self) -> String {
631 match self {
632 AlertThreshold::Below => "Below".to_string(),
633 AlertThreshold::Above => "Above".to_string(),
634 AlertThreshold::Outside => "Outside".to_string(),
635 }
636 }
637}
638
639#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)]
640pub enum Status {
641 #[default]
642 All,
643 Pending,
644 Processing,
645 Processed,
646 Failed,
647}
648
649impl Status {
650 pub fn as_str(&self) -> Option<&'static str> {
651 match self {
652 Status::All => None,
653 Status::Pending => Some("pending"),
654 Status::Processing => Some("processing"),
655 Status::Processed => Some("processed"),
656 Status::Failed => Some("failed"),
657 }
658 }
659}
660
661impl FromStr for Status {
662 type Err = TypeError;
663
664 fn from_str(s: &str) -> Result<Self, Self::Err> {
665 match s.to_lowercase().as_str() {
666 "all" => Ok(Status::All),
667 "pending" => Ok(Status::Pending),
668 "processing" => Ok(Status::Processing),
669 "processed" => Ok(Status::Processed),
670 "failed" => Ok(Status::Failed),
671 _ => Err(TypeError::InvalidStatusError(s.to_string())),
672 }
673 }
674}
675
676impl Display for Status {
677 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678 match self {
679 Status::All => write!(f, "all"),
680 Status::Pending => write!(f, "pending"),
681 Status::Processing => write!(f, "processing"),
682 Status::Processed => write!(f, "processed"),
683 Status::Failed => write!(f, "failed"),
684 }
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691
692 pub struct TestStruct;
693 impl ValidateAlertConfig for TestStruct {}
694
695 #[test]
696 fn test_resolve_schedule_base() {
697 let valid_schedule = "0 0 5 * * *"; let result = TestStruct::resolve_schedule(valid_schedule);
700
701 assert_eq!(result, "0 0 5 * * *".to_string());
702
703 let invalid_schedule = "invalid_cron";
704
705 let default_schedule = CommonCrons::EveryDay.cron();
706
707 let result = TestStruct::resolve_schedule(invalid_schedule);
708
709 assert_eq!(result, default_schedule);
710 }
711}