1use crate::error::{ProfileError, TypeError, UtilError};
2use crate::FeatureMap;
3use crate::{CommonCrons, DriftType};
4use base64::prelude::*;
5use chrono::{DateTime, Utc};
6use colored_json::{Color, ColorMode, ColoredFormatter, PrettyFormatter, Styler};
7use opentelemetry::Key;
8use opentelemetry::KeyValue;
9use opentelemetry::Value as OTelValue;
10use opentelemetry_proto::tonic::common::v1::{any_value::Value as AnyValueVariant, AnyValue};
11use pyo3::exceptions::PyRuntimeError;
12use pyo3::prelude::*;
13use pyo3::types::{PyBool, PyBytes, PyDict, PyFloat, PyInt, PyList, PyString, PyTuple};
14use pyo3::IntoPyObjectExt;
15use pythonize::depythonize;
16use rayon::prelude::*;
17use serde::{Deserialize, Serialize};
18use serde_json::{json, Value};
19use std::collections::{BTreeSet, HashMap};
20use std::fmt::{Display, Formatter};
21use std::path::PathBuf;
22use std::str::FromStr;
23
24pub const MISSING: &str = "__missing__";
25pub const DEFAULT_VERSION: &str = "0.0.0";
26
27pub fn scouter_version() -> String {
28 env!("CARGO_PKG_VERSION").to_string()
29}
30
31pub enum FileName {
32 SpcDriftMap,
33 SpcDriftProfile,
34 PsiDriftMap,
35 PsiDriftProfile,
36 CustomDriftProfile,
37 DriftProfile,
38 DataProfile,
39 GenAIEvalProfile,
40}
41
42impl FileName {
43 pub fn to_str(&self) -> &'static str {
44 match self {
45 FileName::SpcDriftMap => "spc_drift_map.json",
46 FileName::SpcDriftProfile => "spc_drift_profile.json",
47 FileName::PsiDriftMap => "psi_drift_map.json",
48 FileName::PsiDriftProfile => "psi_drift_profile.json",
49 FileName::CustomDriftProfile => "custom_drift_profile.json",
50 FileName::DataProfile => "data_profile.json",
51 FileName::DriftProfile => "drift_profile.json",
52 FileName::GenAIEvalProfile => "genai_drift_profile.json",
53 }
54 }
55}
56
57pub struct PyHelperFuncs {}
58
59impl PyHelperFuncs {
60 pub fn __str__<T: Serialize>(object: T) -> String {
61 match ColoredFormatter::with_styler(
62 PrettyFormatter::default(),
63 Styler {
64 key: Color::Rgb(245, 77, 85).bold(),
65 string_value: Color::Rgb(249, 179, 93).foreground(),
66 float_value: Color::Rgb(249, 179, 93).foreground(),
67 integer_value: Color::Rgb(249, 179, 93).foreground(),
68 bool_value: Color::Rgb(249, 179, 93).foreground(),
69 nil_value: Color::Rgb(249, 179, 93).foreground(),
70 ..Default::default()
71 },
72 )
73 .to_colored_json(&object, ColorMode::On)
74 {
75 Ok(json) => json,
76 Err(e) => format!("Failed to serialize to json: {e}"),
77 }
78 }
80
81 pub fn __json__<T: Serialize>(object: T) -> String {
82 match serde_json::to_string_pretty(&object) {
83 Ok(json) => json,
84 Err(e) => format!("Failed to serialize to json: {e}"),
85 }
86 }
87
88 pub fn save_to_json<T>(
89 model: T,
90 path: Option<PathBuf>,
91 filename: &str,
92 ) -> Result<PathBuf, UtilError>
93 where
94 T: Serialize,
95 {
96 let json = serde_json::to_string_pretty(&model)?;
98
99 let write_path = if path.is_some() {
101 let mut new_path = path.ok_or(UtilError::CreatePathError)?;
102
103 new_path.set_extension("json");
105
106 if !new_path.exists() {
107 let parent_path = new_path.parent().ok_or(UtilError::GetParentPathError)?;
109
110 std::fs::create_dir_all(parent_path)
111 .map_err(|_| UtilError::CreateDirectoryError)?;
112 }
113
114 new_path
115 } else {
116 PathBuf::from(filename)
117 };
118
119 std::fs::write(&write_path, json)?;
120
121 Ok(write_path)
122 }
123}
124
125pub fn json_to_pyobject(py: Python, value: &Value, dict: &Bound<'_, PyDict>) -> PyResult<()> {
126 match value {
127 Value::Object(map) => {
128 for (k, v) in map {
129 let py_value = match v {
130 Value::Null => py.None(),
131 Value::Bool(b) => b.into_py_any(py).unwrap(),
132 Value::Number(n) => {
133 if let Some(i) = n.as_i64() {
134 i.into_py_any(py).unwrap()
135 } else if let Some(f) = n.as_f64() {
136 f.into_py_any(py).unwrap()
137 } else {
138 return Err(PyRuntimeError::new_err(
139 "Invalid number type, expected i64 or f64",
140 ));
141 }
142 }
143 Value::String(s) => s.into_py_any(py).unwrap(),
144 Value::Array(arr) => {
145 let py_list = PyList::empty(py);
146 for item in arr {
147 let py_item = json_to_pyobject_value(py, item)?;
148 py_list.append(py_item)?;
149 }
150 py_list.into_py_any(py).unwrap()
151 }
152 Value::Object(_) => {
153 let nested_dict = PyDict::new(py);
154 json_to_pyobject(py, v, &nested_dict)?;
155 nested_dict.into_py_any(py).unwrap()
156 }
157 };
158 dict.set_item(k, py_value)?;
159 }
160 }
161 _ => return Err(PyRuntimeError::new_err("Root must be object")),
162 }
163 Ok(())
164}
165
166pub fn json_to_pyobject_value(py: Python, value: &Value) -> PyResult<Py<PyAny>> {
167 Ok(match value {
168 Value::Null => py.None(),
169 Value::Bool(b) => b.into_py_any(py).unwrap(),
170 Value::Number(n) => {
171 if let Some(i) = n.as_i64() {
172 i.into_py_any(py).unwrap()
173 } else if let Some(f) = n.as_f64() {
174 f.into_py_any(py).unwrap()
175 } else {
176 return Err(PyRuntimeError::new_err(
177 "Invalid number type, expected i64 or f64",
178 ));
179 }
180 }
181 Value::String(s) => s.into_py_any(py).unwrap(),
182 Value::Array(arr) => {
183 let py_list = PyList::empty(py);
184 for item in arr {
185 let py_item = json_to_pyobject_value(py, item)?;
186 py_list.append(py_item)?;
187 }
188 py_list.into_py_any(py).unwrap()
189 }
190 Value::Object(_) => {
191 let nested_dict = PyDict::new(py);
192 json_to_pyobject(py, value, &nested_dict)?;
193 nested_dict.into_py_any(py).unwrap()
194 }
195 })
196}
197
198pub fn pyobject_to_json(obj: &Bound<'_, PyAny>) -> Result<Value, TypeError> {
199 if obj.is_instance_of::<PyDict>() {
200 let dict = obj.cast::<PyDict>()?;
201 let mut map = serde_json::Map::new();
202 for (key, value) in dict.iter() {
203 let key_str = key.extract::<String>()?;
204 let json_value = pyobject_to_json(&value)?;
205 map.insert(key_str, json_value);
206 }
207 Ok(Value::Object(map))
208 } else if obj.is_instance_of::<PyList>() {
209 let list = obj.cast::<PyList>()?;
210 let mut vec = Vec::new();
211 for item in list.iter() {
212 vec.push(pyobject_to_json(&item)?);
213 }
214 Ok(Value::Array(vec))
215 } else if obj.is_instance_of::<PyTuple>() {
216 let tuple = obj.cast::<PyTuple>()?;
217 let mut vec = Vec::new();
218 for item in tuple.iter() {
219 vec.push(pyobject_to_json(&item)?);
220 }
221 Ok(Value::Array(vec))
222 } else if obj.is_instance_of::<PyBytes>() {
223 let bytes = obj.cast::<PyBytes>()?;
224 let b64_string = BASE64_STANDARD.encode(bytes.as_bytes());
225 Ok(Value::String(b64_string))
226 } else if obj.is_instance_of::<PyString>() {
227 let s = obj.extract::<String>()?;
228 Ok(Value::String(s))
229 } else if obj.is_instance_of::<PyFloat>() {
230 let f = obj.extract::<f64>()?;
231 Ok(json!(f))
232 } else if obj.is_instance_of::<PyBool>() {
233 let b = obj.extract::<bool>()?;
234 Ok(json!(b))
235 } else if obj.is_instance_of::<PyInt>() {
236 let i = obj.extract::<i64>()?;
237 Ok(json!(i))
238 } else if obj.is_none() {
239 Ok(Value::Null)
240 } else {
241 Err(TypeError::UnsupportedPyObjectType)
242 }
243}
244
245pub fn pyobject_to_tracing_json(
248 obj: &Bound<'_, PyAny>,
249 max_length: &usize,
250) -> Result<Value, TypeError> {
251 let py = obj.py();
253
254 if is_pydantic_basemodel(py, obj)? {
255 let dict = obj.call_method0("model_dump")?;
256 return pyobject_to_tracing_json(&dict, max_length);
257 }
258 if obj.is_instance_of::<PyDict>() {
259 let dict = obj.cast::<PyDict>()?;
260 let mut map = serde_json::Map::new();
261 for (key, value) in dict.iter() {
262 let key = pyobject_to_tracing_json(&key, max_length)?;
263 let key_str = match key {
265 Value::String(s) => s,
266 Value::Number(n) => n.to_string(),
267 Value::Bool(b) => b.to_string(),
268 _ => return Err(TypeError::InvalidDictKeyType),
269 };
270 let json_value = pyobject_to_tracing_json(&value, max_length)?;
271 map.insert(key_str.to_string(), json_value);
272 }
273 Ok(Value::Object(map))
274 } else if obj.is_instance_of::<PyList>() {
275 let list = obj.cast::<PyList>()?;
276 let mut vec = Vec::new();
277 for item in list.iter() {
278 vec.push(pyobject_to_tracing_json(&item, max_length)?);
279 }
280 Ok(Value::Array(vec))
281 } else if obj.is_instance_of::<PyTuple>() {
282 let tuple = obj.cast::<PyTuple>()?;
283 let mut vec = Vec::new();
284 for item in tuple.iter() {
285 vec.push(pyobject_to_tracing_json(&item, max_length)?);
286 }
287 Ok(Value::Array(vec))
288 } else if obj.is_instance_of::<PyBytes>() {
289 let bytes = obj.cast::<PyBytes>()?;
290 let b64_string = BASE64_STANDARD.encode(bytes.as_bytes());
291 Ok(Value::String(b64_string))
292 } else if obj.is_instance_of::<PyString>() {
293 let s = obj.extract::<String>()?;
294 let truncated = if s.len() > *max_length {
295 format!("{}...[truncated]", &s[..*max_length])
296 } else {
297 s
298 };
299 Ok(Value::String(truncated))
300 } else if obj.is_instance_of::<PyFloat>() {
301 let f = obj.extract::<f64>()?;
302 Ok(json!(f))
303 } else if obj.is_instance_of::<PyBool>() {
304 let b = obj.extract::<bool>()?;
305 Ok(json!(b))
306 } else if obj.is_instance_of::<PyInt>() {
307 let i = obj.extract::<i64>()?;
308 Ok(json!(i))
309 } else if obj.is_none() {
310 Ok(Value::Null)
311 } else {
312 let ty = match obj.get_type().name() {
315 Ok(name) => name.to_string(),
316 Err(_) => "unknown".to_string(),
317 };
318
319 Ok(Value::String(ty))
320 }
321}
322
323pub fn pyobject_to_otel_value(obj: &Bound<'_, PyAny>) -> Result<OTelValue, TypeError> {
326 let value: Value = depythonize(obj)?;
327 Ok(serde_value_to_otel_value(&value))
328}
329
330fn serde_value_to_otel_value(value: &Value) -> OTelValue {
333 match value {
334 Value::Bool(b) => OTelValue::Bool(*b),
335 Value::Number(n) => {
336 if let Some(i) = n.as_i64() {
337 OTelValue::I64(i)
338 } else if let Some(f) = n.as_f64() {
339 OTelValue::F64(f)
340 } else {
341 OTelValue::String(opentelemetry::StringValue::from(n.to_string()))
342 }
343 }
344 Value::String(s) => OTelValue::String(opentelemetry::StringValue::from(s.clone())),
345 Value::Array(arr) => {
346 if let Some(array) = try_homogeneous_array(arr) {
348 OTelValue::Array(array)
349 } else {
350 let strings: Vec<opentelemetry::StringValue> = arr
352 .iter()
353 .map(|v| opentelemetry::StringValue::from(v.to_string()))
354 .collect();
355 OTelValue::Array(opentelemetry::Array::String(strings))
356 }
357 }
358 Value::Object(_) => {
359 OTelValue::String(opentelemetry::StringValue::from(value.to_string()))
361 }
362 Value::Null => OTelValue::String(opentelemetry::StringValue::from("null")),
363 }
364}
365
366fn try_homogeneous_array(arr: &[Value]) -> Option<opentelemetry::Array> {
369 if arr.is_empty() {
370 return Some(opentelemetry::Array::String(Vec::new()));
371 }
372
373 match arr.first()? {
375 Value::Bool(_) => {
376 let bools: Option<Vec<bool>> = arr.iter().map(|v| v.as_bool()).collect();
377 bools.map(opentelemetry::Array::Bool)
378 }
379 Value::Number(n) if n.is_i64() => {
380 let ints: Option<Vec<i64>> = arr.iter().map(|v| v.as_i64()).collect();
381 ints.map(opentelemetry::Array::I64)
382 }
383 Value::Number(_) => {
384 let floats: Option<Vec<f64>> = arr.iter().map(|v| v.as_f64()).collect();
385 floats.map(opentelemetry::Array::F64)
386 }
387 Value::String(_) => {
388 let strings: Vec<opentelemetry::StringValue> = arr
389 .iter()
390 .filter_map(|v| v.as_str())
391 .map(|s| opentelemetry::StringValue::from(s.to_string()))
392 .collect();
393 if strings.len() == arr.len() {
394 Some(opentelemetry::Array::String(strings))
395 } else {
396 None
397 }
398 }
399 _ => None, }
401}
402
403pub fn pydict_to_otel_keyvalue(obj: &Bound<'_, PyAny>) -> Result<Vec<KeyValue>, TypeError> {
407 let value: Value = depythonize(obj)?;
408
409 match value {
410 Value::Object(map) => Ok(flatten_json_object(&map, None)),
411 _ => Err(TypeError::ExpectedPyDict),
412 }
413}
414
415fn flatten_json_object(
417 obj: &serde_json::Map<String, Value>,
418 prefix: Option<&str>,
419) -> Vec<KeyValue> {
420 let mut result = Vec::new();
421
422 for (key, value) in obj {
423 let full_key = if let Some(p) = prefix {
424 format!("{}.{}", p, key)
425 } else {
426 key.clone()
427 };
428
429 match value {
430 Value::Object(nested) => {
431 result.extend(flatten_json_object(nested, Some(&full_key)));
433 }
434 _ => {
435 let otel_value = serde_value_to_otel_value(value);
437 result.push(KeyValue::new(Key::new(full_key), otel_value));
438 }
439 }
440 }
441 result
442}
443
444pub fn otel_value_to_serde_value(otel_value: &AnyValue) -> Value {
449 match &otel_value.value {
450 Some(variant) => match variant {
451 AnyValueVariant::BoolValue(b) => Value::Bool(*b),
452 AnyValueVariant::IntValue(i) => Value::Number(serde_json::Number::from(*i)),
453 AnyValueVariant::DoubleValue(d) => serde_json::Number::from_f64(*d)
454 .map(Value::Number)
455 .unwrap_or(Value::Null),
456 AnyValueVariant::StringValue(s) => Value::String(s.clone()),
457 AnyValueVariant::ArrayValue(array) => {
458 let values: Vec<Value> =
459 array.values.iter().map(otel_value_to_serde_value).collect();
460 Value::Array(values)
461 }
462 AnyValueVariant::KvlistValue(kvlist) => {
463 let mut map = serde_json::Map::new();
464 for kv in &kvlist.values {
465 if let Some(value) = &kv.value {
466 map.insert(kv.key.clone(), otel_value_to_serde_value(value));
467 }
468 }
469 Value::Object(map)
470 }
471 AnyValueVariant::BytesValue(bytes) => Value::String(BASE64_STANDARD.encode(bytes)),
472 },
473 None => Value::Null,
474 }
475}
476
477pub fn create_feature_map(
478 features: &[String],
479 array: &[Vec<String>],
480) -> Result<FeatureMap, ProfileError> {
481 if features.len() != array.len() {
483 return Err(ProfileError::FeatureArrayLengthError);
484 };
485
486 let feature_map = array
487 .par_iter()
488 .enumerate()
489 .map(|(i, col)| {
490 let unique = col
491 .iter()
492 .collect::<BTreeSet<_>>()
493 .into_iter()
494 .collect::<Vec<_>>();
495 let mut map = HashMap::new();
496 for (j, item) in unique.iter().enumerate() {
497 map.insert(item.to_string(), j as i32);
498
499 if j == unique.len() - 1 {
501 map.insert("missing".to_string(), j as i32 + 1);
503 }
504 }
505
506 (features[i].to_string(), map)
507 })
508 .collect::<HashMap<_, _>>();
509
510 Ok(FeatureMap {
511 features: feature_map,
512 })
513}
514
515pub fn is_pydantic_basemodel(py: Python, obj: &Bound<'_, PyAny>) -> Result<bool, TypeError> {
523 let pydantic = match py.import("pydantic") {
524 Ok(module) => module,
525 Err(_) => return Ok(false),
527 };
528
529 let basemodel = pydantic.getattr("BaseModel")?;
530
531 let is_basemodel = obj
533 .is_instance(&basemodel)
534 .map_err(|e| TypeError::FailedToCheckPydanticModel(e.to_string()))?;
535
536 Ok(is_basemodel)
537}
538
539pub fn is_pydict(obj: &Bound<'_, PyAny>) -> bool {
540 obj.is_instance_of::<PyDict>()
541}
542
543pub fn pydantic_to_value<'py>(obj: &Bound<'py, PyAny>) -> Result<Value, TypeError> {
546 let dict = obj.call_method0("model_dump")?;
547 pyobject_to_json(&dict)
548}
549
550fn process_dict_with_nested_models(
551 py: Python<'_>,
552 dict: &Bound<'_, PyAny>,
553) -> Result<Value, TypeError> {
554 let py_dict = dict.cast::<PyDict>()?;
555 let mut result = serde_json::Map::new();
556
557 for (key, value) in py_dict.iter() {
558 let key_str: String = key.extract()?;
559 let processed_value = depythonize_object_to_value(py, &value)?;
560 result.insert(key_str, processed_value);
561 }
562
563 Ok(Value::Object(result))
564}
565
566pub fn depythonize_object_to_value<'py>(
567 py: Python<'py>,
568 value: &Bound<'py, PyAny>,
569) -> Result<Value, TypeError> {
570 let py_value = if is_pydantic_basemodel(py, value)? {
571 let model = value.call_method0("model_dump")?;
572 depythonize(&model)?
573 } else if value.is_instance_of::<PyDict>() {
574 process_dict_with_nested_models(py, value)?
575 } else {
576 depythonize(value)?
577 };
578 Ok(py_value)
579}
580
581#[derive(PartialEq, Debug)]
582pub struct ProfileArgs {
583 pub name: String,
584 pub space: String,
585 pub version: Option<String>,
586 pub schedule: String,
587 pub scouter_version: String,
588 pub drift_type: DriftType,
589}
590
591pub trait ProfileBaseArgs {
593 type Config: ConfigExt;
594
595 fn config(&self) -> &Self::Config;
596 fn get_base_args(&self) -> ProfileArgs;
597 fn to_value(&self) -> serde_json::Value;
598 fn space(&self) -> &str {
599 self.config().space()
600 }
601 fn name(&self) -> &str {
602 self.config().name()
603 }
604 fn version(&self) -> &str {
605 self.config().version()
606 }
607}
608
609pub trait ConfigExt {
610 fn space(&self) -> &str;
611 fn name(&self) -> &str;
612 fn version(&self) -> &str;
613}
614
615pub trait ValidateAlertConfig {
616 fn resolve_schedule(schedule: &str) -> String {
617 let default_schedule = CommonCrons::EveryDay.cron();
618
619 cron::Schedule::from_str(schedule) .map(|_| schedule) .unwrap_or_else(|_| {
622 tracing::error!("Invalid cron schedule, using default schedule");
623 &default_schedule
624 })
625 .to_string()
626 }
627}
628
629#[pyclass(eq, name = "ScouterDataType")]
630#[derive(PartialEq, Debug)]
631pub enum DataType {
632 Pandas,
633 Polars,
634 Numpy,
635 Arrow,
636 Unknown,
637 GenAI,
638}
639
640impl Display for DataType {
641 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
642 match self {
643 DataType::Pandas => write!(f, "pandas"),
644 DataType::Polars => write!(f, "polars"),
645 DataType::Numpy => write!(f, "numpy"),
646 DataType::Arrow => write!(f, "arrow"),
647 DataType::Unknown => write!(f, "unknown"),
648 DataType::GenAI => write!(f, "genai"),
649 }
650 }
651}
652
653impl DataType {
654 pub fn from_module_name(module_name: &str) -> Result<Self, TypeError> {
655 match module_name {
656 "pandas.core.frame.DataFrame" => Ok(DataType::Pandas),
657 "polars.dataframe.frame.DataFrame" => Ok(DataType::Polars),
658 "numpy.ndarray" => Ok(DataType::Numpy),
659 "pyarrow.lib.Table" => Ok(DataType::Arrow),
660 "scouter_drift.genai.GenAIEvalRecord" => Ok(DataType::GenAI),
661 _ => Err(TypeError::InvalidDataType),
662 }
663 }
664}
665
666pub fn get_utc_datetime() -> DateTime<Utc> {
667 Utc::now()
668}
669
670#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)]
671pub enum Status {
672 #[default]
673 All,
674 Pending,
675 Processing,
676 Processed,
677 Failed,
678}
679
680impl Status {
681 pub fn as_str(&self) -> Option<&'static str> {
682 match self {
683 Status::All => None,
684 Status::Pending => Some("pending"),
685 Status::Processing => Some("processing"),
686 Status::Processed => Some("processed"),
687 Status::Failed => Some("failed"),
688 }
689 }
690}
691
692impl TryFrom<String> for Status {
693 type Error = String;
694
695 fn try_from(s: String) -> Result<Self, Self::Error> {
696 s.parse()
697 }
698}
699
700impl std::str::FromStr for Status {
701 type Err = String;
702
703 fn from_str(s: &str) -> Result<Self, Self::Err> {
704 match s.to_lowercase().as_str() {
705 "all" => Ok(Status::All),
706 "pending" => Ok(Status::Pending),
707 "processing" => Ok(Status::Processing),
708 "processed" => Ok(Status::Processed),
709 "failed" => Ok(Status::Failed),
710 _ => Err(format!("Unknown status: {}", s)),
711 }
712 }
713}
714
715impl Display for Status {
716 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
717 match self {
718 Status::All => write!(f, "all"),
719 Status::Pending => write!(f, "pending"),
720 Status::Processing => write!(f, "processing"),
721 Status::Processed => write!(f, "processed"),
722 Status::Failed => write!(f, "failed"),
723 }
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use super::*;
730
731 pub struct TestStruct;
732 impl ValidateAlertConfig for TestStruct {}
733
734 #[test]
735 fn test_resolve_schedule_base() {
736 let valid_schedule = "0 0 5 * * *"; let result = TestStruct::resolve_schedule(valid_schedule);
739
740 assert_eq!(result, "0 0 5 * * *".to_string());
741
742 let invalid_schedule = "invalid_cron";
743
744 let default_schedule = CommonCrons::EveryDay.cron();
745
746 let result = TestStruct::resolve_schedule(invalid_schedule);
747
748 assert_eq!(result, default_schedule);
749 }
750}