1use crate::error::{ProfileError, TypeError, UtilError};
2use crate::traits::ConfigExt;
3use crate::FeatureMap;
4use crate::{CommonCrons, DriftType};
5use base64::prelude::*;
6use chrono::{DateTime, Utc};
7use colored_json::{Color, ColorMode, ColoredFormatter, PrettyFormatter, Styler};
8use opentelemetry::Key;
9use opentelemetry::KeyValue;
10use opentelemetry::Value as OTelValue;
11use opentelemetry_proto::tonic::common::v1::{any_value::Value as AnyValueVariant, AnyValue};
12use pyo3::exceptions::PyRuntimeError;
13use pyo3::prelude::*;
14use pyo3::types::{PyBool, PyBytes, PyDict, PyFloat, PyInt, PyList, PyString, PyTuple};
15use pyo3::IntoPyObjectExt;
16use pythonize::depythonize;
17use rayon::prelude::*;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::collections::{BTreeSet, HashMap};
21use std::fmt::{Display, Formatter};
22use std::path::PathBuf;
23use std::str::FromStr;
24
25pub const MISSING: &str = "__missing__";
26pub const DEFAULT_VERSION: &str = "0.0.0";
27
28pub fn scouter_version() -> String {
29 env!("CARGO_PKG_VERSION").to_string()
30}
31
32pub enum FileName {
33 SpcDriftMap,
34 SpcDriftProfile,
35 PsiDriftMap,
36 PsiDriftProfile,
37 CustomDriftProfile,
38 DriftProfile,
39 DataProfile,
40 GenAIEvalProfile,
41}
42
43impl FileName {
44 pub fn to_str(&self) -> &'static str {
45 match self {
46 FileName::SpcDriftMap => "spc_drift_map.json",
47 FileName::SpcDriftProfile => "spc_drift_profile.json",
48 FileName::PsiDriftMap => "psi_drift_map.json",
49 FileName::PsiDriftProfile => "psi_drift_profile.json",
50 FileName::CustomDriftProfile => "custom_drift_profile.json",
51 FileName::DataProfile => "data_profile.json",
52 FileName::DriftProfile => "drift_profile.json",
53 FileName::GenAIEvalProfile => "genai_drift_profile.json",
54 }
55 }
56}
57
58pub struct PyHelperFuncs {}
59
60impl PyHelperFuncs {
61 pub fn __str__<T: Serialize>(object: T) -> String {
62 match ColoredFormatter::with_styler(
63 PrettyFormatter::default(),
64 Styler {
65 key: Color::Rgb(245, 77, 85).bold(),
66 string_value: Color::Rgb(249, 179, 93).foreground(),
67 float_value: Color::Rgb(249, 179, 93).foreground(),
68 integer_value: Color::Rgb(249, 179, 93).foreground(),
69 bool_value: Color::Rgb(249, 179, 93).foreground(),
70 nil_value: Color::Rgb(249, 179, 93).foreground(),
71 ..Default::default()
72 },
73 )
74 .to_colored_json(&object, ColorMode::On)
75 {
76 Ok(json) => json,
77 Err(e) => format!("Failed to serialize to json: {e}"),
78 }
79 }
81
82 pub fn __json__<T: Serialize>(object: T) -> String {
83 match serde_json::to_string_pretty(&object) {
84 Ok(json) => json,
85 Err(e) => format!("Failed to serialize to json: {e}"),
86 }
87 }
88
89 pub fn save_to_json<T>(
90 model: T,
91 path: Option<PathBuf>,
92 filename: &str,
93 ) -> Result<PathBuf, UtilError>
94 where
95 T: Serialize,
96 {
97 let json = serde_json::to_string_pretty(&model)?;
99
100 let write_path = if path.is_some() {
102 let mut new_path = path.ok_or(UtilError::CreatePathError)?;
103
104 new_path.set_extension("json");
106
107 if !new_path.exists() {
108 let parent_path = new_path.parent().ok_or(UtilError::GetParentPathError)?;
110
111 std::fs::create_dir_all(parent_path)
112 .map_err(|_| UtilError::CreateDirectoryError)?;
113 }
114
115 new_path
116 } else {
117 PathBuf::from(filename)
118 };
119
120 std::fs::write(&write_path, json)?;
121
122 Ok(write_path)
123 }
124}
125
126pub fn json_to_pyobject(py: Python, value: &Value, dict: &Bound<'_, PyDict>) -> PyResult<()> {
127 match value {
128 Value::Object(map) => {
129 for (k, v) in map {
130 let py_value = match v {
131 Value::Null => py.None(),
132 Value::Bool(b) => b.into_py_any(py).unwrap(),
133 Value::Number(n) => {
134 if let Some(i) = n.as_i64() {
135 i.into_py_any(py).unwrap()
136 } else if let Some(f) = n.as_f64() {
137 f.into_py_any(py).unwrap()
138 } else {
139 return Err(PyRuntimeError::new_err(
140 "Invalid number type, expected i64 or f64",
141 ));
142 }
143 }
144 Value::String(s) => s.into_py_any(py).unwrap(),
145 Value::Array(arr) => {
146 let py_list = PyList::empty(py);
147 for item in arr {
148 let py_item = json_to_pyobject_value(py, item)?;
149 py_list.append(py_item)?;
150 }
151 py_list.into_py_any(py).unwrap()
152 }
153 Value::Object(_) => {
154 let nested_dict = PyDict::new(py);
155 json_to_pyobject(py, v, &nested_dict)?;
156 nested_dict.into_py_any(py).unwrap()
157 }
158 };
159 dict.set_item(k, py_value)?;
160 }
161 }
162 _ => return Err(PyRuntimeError::new_err("Root must be object")),
163 }
164 Ok(())
165}
166
167pub fn json_to_pyobject_value(py: Python, value: &Value) -> PyResult<Py<PyAny>> {
168 Ok(match value {
169 Value::Null => py.None(),
170 Value::Bool(b) => b.into_py_any(py).unwrap(),
171 Value::Number(n) => {
172 if let Some(i) = n.as_i64() {
173 i.into_py_any(py).unwrap()
174 } else if let Some(f) = n.as_f64() {
175 f.into_py_any(py).unwrap()
176 } else {
177 return Err(PyRuntimeError::new_err(
178 "Invalid number type, expected i64 or f64",
179 ));
180 }
181 }
182 Value::String(s) => s.into_py_any(py).unwrap(),
183 Value::Array(arr) => {
184 let py_list = PyList::empty(py);
185 for item in arr {
186 let py_item = json_to_pyobject_value(py, item)?;
187 py_list.append(py_item)?;
188 }
189 py_list.into_py_any(py).unwrap()
190 }
191 Value::Object(_) => {
192 let nested_dict = PyDict::new(py);
193 json_to_pyobject(py, value, &nested_dict)?;
194 nested_dict.into_py_any(py).unwrap()
195 }
196 })
197}
198
199pub fn pyobject_to_json(obj: &Bound<'_, PyAny>) -> Result<Value, TypeError> {
200 if obj.is_instance_of::<PyDict>() {
201 let dict = obj.cast::<PyDict>()?;
202 let mut map = serde_json::Map::new();
203 for (key, value) in dict.iter() {
204 let key_str = key.extract::<String>()?;
205 let json_value = pyobject_to_json(&value)?;
206 map.insert(key_str, json_value);
207 }
208 Ok(Value::Object(map))
209 } else if obj.is_instance_of::<PyList>() {
210 let list = obj.cast::<PyList>()?;
211 let mut vec = Vec::new();
212 for item in list.iter() {
213 vec.push(pyobject_to_json(&item)?);
214 }
215 Ok(Value::Array(vec))
216 } else if obj.is_instance_of::<PyTuple>() {
217 let tuple = obj.cast::<PyTuple>()?;
218 let mut vec = Vec::new();
219 for item in tuple.iter() {
220 vec.push(pyobject_to_json(&item)?);
221 }
222 Ok(Value::Array(vec))
223 } else if obj.is_instance_of::<PyBytes>() {
224 let bytes = obj.cast::<PyBytes>()?;
225 let b64_string = BASE64_STANDARD.encode(bytes.as_bytes());
226 Ok(Value::String(b64_string))
227 } else if obj.is_instance_of::<PyString>() {
228 let s = obj.extract::<String>()?;
229 Ok(Value::String(s))
230 } else if obj.is_instance_of::<PyFloat>() {
231 let f = obj.extract::<f64>()?;
232 Ok(json!(f))
233 } else if obj.is_instance_of::<PyBool>() {
234 let b = obj.extract::<bool>()?;
235 Ok(json!(b))
236 } else if obj.is_instance_of::<PyInt>() {
237 let i = obj.extract::<i64>()?;
238 Ok(json!(i))
239 } else if obj.is_none() {
240 Ok(Value::Null)
241 } else {
242 Err(TypeError::UnsupportedPyObjectType)
243 }
244}
245
246pub fn pyobject_to_tracing_json(
249 obj: &Bound<'_, PyAny>,
250 max_length: &usize,
251) -> Result<Value, TypeError> {
252 let py = obj.py();
254
255 if is_pydantic_basemodel(py, obj)? {
256 let dict = obj.call_method0("model_dump")?;
257 return pyobject_to_tracing_json(&dict, max_length);
258 }
259 if obj.is_instance_of::<PyDict>() {
260 let dict = obj.cast::<PyDict>()?;
261 let mut map = serde_json::Map::new();
262 for (key, value) in dict.iter() {
263 let key = pyobject_to_tracing_json(&key, max_length)?;
264 let key_str = match key {
266 Value::String(s) => s,
267 Value::Number(n) => n.to_string(),
268 Value::Bool(b) => b.to_string(),
269 _ => return Err(TypeError::InvalidDictKeyType),
270 };
271 let json_value = pyobject_to_tracing_json(&value, max_length)?;
272 map.insert(key_str.to_string(), json_value);
273 }
274 Ok(Value::Object(map))
275 } else if obj.is_instance_of::<PyList>() {
276 let list = obj.cast::<PyList>()?;
277 let mut vec = Vec::new();
278 for item in list.iter() {
279 vec.push(pyobject_to_tracing_json(&item, max_length)?);
280 }
281 Ok(Value::Array(vec))
282 } else if obj.is_instance_of::<PyTuple>() {
283 let tuple = obj.cast::<PyTuple>()?;
284 let mut vec = Vec::new();
285 for item in tuple.iter() {
286 vec.push(pyobject_to_tracing_json(&item, max_length)?);
287 }
288 Ok(Value::Array(vec))
289 } else if obj.is_instance_of::<PyBytes>() {
290 let bytes = obj.cast::<PyBytes>()?;
291 let b64_string = BASE64_STANDARD.encode(bytes.as_bytes());
292 Ok(Value::String(b64_string))
293 } else if obj.is_instance_of::<PyString>() {
294 let s = obj.extract::<String>()?;
295 let truncated = if s.len() > *max_length {
296 format!("{}...[truncated]", &s[..*max_length])
297 } else {
298 s
299 };
300 Ok(Value::String(truncated))
301 } else if obj.is_instance_of::<PyFloat>() {
302 let f = obj.extract::<f64>()?;
303 Ok(json!(f))
304 } else if obj.is_instance_of::<PyBool>() {
305 let b = obj.extract::<bool>()?;
306 Ok(json!(b))
307 } else if obj.is_instance_of::<PyInt>() {
308 let i = obj.extract::<i64>()?;
309 Ok(json!(i))
310 } else if obj.is_none() {
311 Ok(Value::Null)
312 } else {
313 let ty = match obj.get_type().name() {
316 Ok(name) => name.to_string(),
317 Err(_) => "unknown".to_string(),
318 };
319
320 Ok(Value::String(ty))
321 }
322}
323
324pub fn pyobject_to_otel_value(obj: &Bound<'_, PyAny>) -> Result<OTelValue, TypeError> {
327 let value: Value = depythonize(obj)?;
328 Ok(serde_value_to_otel_value(&value))
329}
330
331fn serde_value_to_otel_value(value: &Value) -> OTelValue {
334 match value {
335 Value::Bool(b) => OTelValue::Bool(*b),
336 Value::Number(n) => {
337 if let Some(i) = n.as_i64() {
338 OTelValue::I64(i)
339 } else if let Some(f) = n.as_f64() {
340 OTelValue::F64(f)
341 } else {
342 OTelValue::String(opentelemetry::StringValue::from(n.to_string()))
343 }
344 }
345 Value::String(s) => OTelValue::String(opentelemetry::StringValue::from(s.clone())),
346 Value::Array(arr) => {
347 if let Some(array) = try_homogeneous_array(arr) {
349 OTelValue::Array(array)
350 } else {
351 let strings: Vec<opentelemetry::StringValue> = arr
353 .iter()
354 .map(|v| opentelemetry::StringValue::from(v.to_string()))
355 .collect();
356 OTelValue::Array(opentelemetry::Array::String(strings))
357 }
358 }
359 Value::Object(_) => {
360 OTelValue::String(opentelemetry::StringValue::from(value.to_string()))
362 }
363 Value::Null => OTelValue::String(opentelemetry::StringValue::from("null")),
364 }
365}
366
367fn try_homogeneous_array(arr: &[Value]) -> Option<opentelemetry::Array> {
370 if arr.is_empty() {
371 return Some(opentelemetry::Array::String(Vec::new()));
372 }
373
374 match arr.first()? {
376 Value::Bool(_) => {
377 let bools: Option<Vec<bool>> = arr.iter().map(|v| v.as_bool()).collect();
378 bools.map(opentelemetry::Array::Bool)
379 }
380 Value::Number(n) if n.is_i64() => {
381 let ints: Option<Vec<i64>> = arr.iter().map(|v| v.as_i64()).collect();
382 ints.map(opentelemetry::Array::I64)
383 }
384 Value::Number(_) => {
385 let floats: Option<Vec<f64>> = arr.iter().map(|v| v.as_f64()).collect();
386 floats.map(opentelemetry::Array::F64)
387 }
388 Value::String(_) => {
389 let strings: Vec<opentelemetry::StringValue> = arr
390 .iter()
391 .filter_map(|v| v.as_str())
392 .map(|s| opentelemetry::StringValue::from(s.to_string()))
393 .collect();
394 if strings.len() == arr.len() {
395 Some(opentelemetry::Array::String(strings))
396 } else {
397 None
398 }
399 }
400 _ => None, }
402}
403
404pub fn pydict_to_otel_keyvalue(obj: &Bound<'_, PyAny>) -> Result<Vec<KeyValue>, TypeError> {
408 let value: Value = depythonize(obj)?;
409
410 match value {
411 Value::Object(map) => Ok(flatten_json_object(&map, None)),
412 _ => Err(TypeError::ExpectedPyDict),
413 }
414}
415
416fn flatten_json_object(
418 obj: &serde_json::Map<String, Value>,
419 prefix: Option<&str>,
420) -> Vec<KeyValue> {
421 let mut result = Vec::new();
422
423 for (key, value) in obj {
424 let full_key = if let Some(p) = prefix {
425 format!("{}.{}", p, key)
426 } else {
427 key.clone()
428 };
429
430 match value {
431 Value::Object(nested) => {
432 result.extend(flatten_json_object(nested, Some(&full_key)));
434 }
435 _ => {
436 let otel_value = serde_value_to_otel_value(value);
438 result.push(KeyValue::new(Key::new(full_key), otel_value));
439 }
440 }
441 }
442 result
443}
444
445pub fn otel_value_to_serde_value(otel_value: &AnyValue) -> Value {
450 match &otel_value.value {
451 Some(variant) => match variant {
452 AnyValueVariant::BoolValue(b) => Value::Bool(*b),
453 AnyValueVariant::IntValue(i) => Value::Number(serde_json::Number::from(*i)),
454 AnyValueVariant::DoubleValue(d) => serde_json::Number::from_f64(*d)
455 .map(Value::Number)
456 .unwrap_or(Value::Null),
457 AnyValueVariant::StringValue(s) => Value::String(s.clone()),
458 AnyValueVariant::ArrayValue(array) => {
459 let values: Vec<Value> =
460 array.values.iter().map(otel_value_to_serde_value).collect();
461 Value::Array(values)
462 }
463 AnyValueVariant::KvlistValue(kvlist) => {
464 let mut map = serde_json::Map::new();
465 for kv in &kvlist.values {
466 if let Some(value) = &kv.value {
467 map.insert(kv.key.clone(), otel_value_to_serde_value(value));
468 }
469 }
470 Value::Object(map)
471 }
472 AnyValueVariant::BytesValue(bytes) => Value::String(BASE64_STANDARD.encode(bytes)),
473 },
474 None => Value::Null,
475 }
476}
477
478pub fn create_feature_map(
479 features: &[String],
480 array: &[Vec<String>],
481) -> Result<FeatureMap, ProfileError> {
482 if features.len() != array.len() {
484 return Err(ProfileError::FeatureArrayLengthError);
485 };
486
487 let feature_map = array
488 .par_iter()
489 .enumerate()
490 .map(|(i, col)| {
491 let unique = col
492 .iter()
493 .collect::<BTreeSet<_>>()
494 .into_iter()
495 .collect::<Vec<_>>();
496 let mut map = HashMap::new();
497 for (j, item) in unique.iter().enumerate() {
498 map.insert(item.to_string(), j as i32);
499
500 if j == unique.len() - 1 {
502 map.insert("missing".to_string(), j as i32 + 1);
504 }
505 }
506
507 (features[i].to_string(), map)
508 })
509 .collect::<HashMap<_, _>>();
510
511 Ok(FeatureMap {
512 features: feature_map,
513 })
514}
515
516pub fn is_pydantic_basemodel(py: Python, obj: &Bound<'_, PyAny>) -> Result<bool, TypeError> {
524 let pydantic = match py.import("pydantic") {
525 Ok(module) => module,
526 Err(_) => return Ok(false),
528 };
529
530 let basemodel = pydantic.getattr("BaseModel")?;
531
532 let is_basemodel = obj
534 .is_instance(&basemodel)
535 .map_err(|e| TypeError::FailedToCheckPydanticModel(e.to_string()))?;
536
537 Ok(is_basemodel)
538}
539
540pub fn is_pydict(obj: &Bound<'_, PyAny>) -> bool {
541 obj.is_instance_of::<PyDict>()
542}
543
544pub fn pydantic_to_value<'py>(obj: &Bound<'py, PyAny>) -> Result<Value, TypeError> {
547 let dict = obj.call_method0("model_dump")?;
548 pyobject_to_json(&dict)
549}
550
551fn process_dict_with_nested_models(
552 py: Python<'_>,
553 dict: &Bound<'_, PyAny>,
554) -> Result<Value, TypeError> {
555 let py_dict = dict.cast::<PyDict>()?;
556 let mut result = serde_json::Map::new();
557
558 for (key, value) in py_dict.iter() {
559 let key_str: String = key.extract()?;
560 let processed_value = depythonize_object_to_value(py, &value)?;
561 result.insert(key_str, processed_value);
562 }
563
564 Ok(Value::Object(result))
565}
566
567pub fn depythonize_object_to_value<'py>(
568 py: Python<'py>,
569 value: &Bound<'py, PyAny>,
570) -> Result<Value, TypeError> {
571 let py_value = if is_pydantic_basemodel(py, value)? {
572 let model = value.call_method0("model_dump")?;
573 depythonize(&model)?
574 } else if value.is_instance_of::<PyDict>() {
575 process_dict_with_nested_models(py, value)?
576 } else {
577 depythonize(value)?
578 };
579 Ok(py_value)
580}
581
582#[derive(PartialEq, Debug)]
583pub struct ProfileArgs {
584 pub name: String,
585 pub space: String,
586 pub version: Option<String>,
587 pub schedule: String,
588 pub scouter_version: String,
589 pub drift_type: DriftType,
590}
591
592pub trait ProfileBaseArgs {
594 type Config: ConfigExt;
595
596 fn config(&self) -> &Self::Config;
597 fn get_base_args(&self) -> ProfileArgs;
598 fn to_value(&self) -> serde_json::Value;
599 fn space(&self) -> &str {
600 self.config().space()
601 }
602 fn name(&self) -> &str {
603 self.config().name()
604 }
605 fn version(&self) -> &str {
606 self.config().version()
607 }
608}
609
610pub trait ValidateAlertConfig {
611 fn resolve_schedule(schedule: &str) -> String {
612 let default_schedule = CommonCrons::EveryDay.cron();
613
614 cron::Schedule::from_str(schedule) .map(|_| schedule) .unwrap_or_else(|_| {
617 tracing::error!("Invalid cron schedule, using default schedule");
618 &default_schedule
619 })
620 .to_string()
621 }
622}
623
624#[pyclass(eq, name = "ScouterDataType")]
625#[derive(PartialEq, Debug)]
626pub enum DataType {
627 Pandas,
628 Polars,
629 Numpy,
630 Arrow,
631 Unknown,
632 GenAI,
633}
634
635impl Display for DataType {
636 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
637 match self {
638 DataType::Pandas => write!(f, "pandas"),
639 DataType::Polars => write!(f, "polars"),
640 DataType::Numpy => write!(f, "numpy"),
641 DataType::Arrow => write!(f, "arrow"),
642 DataType::Unknown => write!(f, "unknown"),
643 DataType::GenAI => write!(f, "genai"),
644 }
645 }
646}
647
648impl DataType {
649 pub fn from_module_name(module_name: &str) -> Result<Self, TypeError> {
650 match module_name {
651 "pandas.core.frame.DataFrame" => Ok(DataType::Pandas),
652 "polars.dataframe.frame.DataFrame" => Ok(DataType::Polars),
653 "numpy.ndarray" => Ok(DataType::Numpy),
654 "pyarrow.lib.Table" => Ok(DataType::Arrow),
655 "scouter_drift.genai.GenAIEvalRecord" => Ok(DataType::GenAI),
656 _ => Err(TypeError::InvalidDataType),
657 }
658 }
659}
660
661pub fn get_utc_datetime() -> DateTime<Utc> {
662 Utc::now()
663}
664
665#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)]
666pub enum Status {
667 #[default]
668 All,
669 Pending,
670 Processing,
671 Processed,
672 Failed,
673}
674
675impl Status {
676 pub fn as_str(&self) -> Option<&'static str> {
677 match self {
678 Status::All => None,
679 Status::Pending => Some("pending"),
680 Status::Processing => Some("processing"),
681 Status::Processed => Some("processed"),
682 Status::Failed => Some("failed"),
683 }
684 }
685}
686
687impl TryFrom<String> for Status {
688 type Error = String;
689
690 fn try_from(s: String) -> Result<Self, Self::Error> {
691 s.parse()
692 }
693}
694
695impl std::str::FromStr for Status {
696 type Err = String;
697
698 fn from_str(s: &str) -> Result<Self, Self::Err> {
699 match s.to_lowercase().as_str() {
700 "all" => Ok(Status::All),
701 "pending" => Ok(Status::Pending),
702 "processing" => Ok(Status::Processing),
703 "processed" => Ok(Status::Processed),
704 "failed" => Ok(Status::Failed),
705 _ => Err(format!("Unknown status: {}", s)),
706 }
707 }
708}
709
710impl Display for Status {
711 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
712 match self {
713 Status::All => write!(f, "all"),
714 Status::Pending => write!(f, "pending"),
715 Status::Processing => write!(f, "processing"),
716 Status::Processed => write!(f, "processed"),
717 Status::Failed => write!(f, "failed"),
718 }
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 pub struct TestStruct;
727 impl ValidateAlertConfig for TestStruct {}
728
729 #[test]
730 fn test_resolve_schedule_base() {
731 let valid_schedule = "0 0 5 * * *"; let result = TestStruct::resolve_schedule(valid_schedule);
734
735 assert_eq!(result, "0 0 5 * * *".to_string());
736
737 let invalid_schedule = "invalid_cron";
738
739 let default_schedule = CommonCrons::EveryDay.cron();
740
741 let result = TestStruct::resolve_schedule(invalid_schedule);
742
743 assert_eq!(result, default_schedule);
744 }
745}