use briefcase_core::{
DecisionSnapshot, ExecutionContext, Input, ModelParameters, Output, Snapshot, SnapshotType,
};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList};
#[pyclass(name = "Input")]
pub struct PyInput {
pub inner: Input,
}
#[pymethods]
impl PyInput {
#[new]
fn new(name: String, value: PyObject, data_type: String) -> PyResult<Self> {
Python::with_gil(|py| {
let json_value = python_to_json_value(value, py)?;
Ok(Self {
inner: Input::new(name, json_value, data_type),
})
})
}
#[getter]
fn name(&self) -> String {
self.inner.name.clone()
}
#[getter]
fn value(&self) -> PyResult<PyObject> {
Python::with_gil(|py| json_value_to_python(&self.inner.value, py))
}
#[getter]
fn data_type(&self) -> String {
self.inner.data_type.clone()
}
fn __repr__(&self) -> String {
format!(
"Input(name='{}', data_type='{}')",
self.inner.name, self.inner.data_type
)
}
fn to_dict(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item("name", &self.inner.name)?;
dict.set_item("value", json_value_to_python(&self.inner.value, py)?)?;
dict.set_item("data_type", &self.inner.data_type)?;
Ok(dict.into())
})
}
}
#[pyclass(name = "Output")]
pub struct PyOutput {
pub inner: Output,
}
#[pymethods]
impl PyOutput {
#[new]
fn new(name: String, value: PyObject, data_type: String) -> PyResult<Self> {
Python::with_gil(|py| {
let json_value = python_to_json_value(value, py)?;
Ok(Self {
inner: Output::new(name, json_value, data_type),
})
})
}
fn with_confidence<'a>(mut slf: PyRefMut<'a, Self>, confidence: f64) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_confidence(confidence);
slf
}
#[getter]
fn name(&self) -> String {
self.inner.name.clone()
}
#[getter]
fn value(&self) -> PyResult<PyObject> {
Python::with_gil(|py| json_value_to_python(&self.inner.value, py))
}
#[getter]
fn data_type(&self) -> String {
self.inner.data_type.clone()
}
#[getter]
fn confidence(&self) -> Option<f64> {
self.inner.confidence
}
fn __repr__(&self) -> String {
format!(
"Output(name='{}', data_type='{}')",
self.inner.name, self.inner.data_type
)
}
fn to_dict(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item("name", &self.inner.name)?;
dict.set_item("value", json_value_to_python(&self.inner.value, py)?)?;
dict.set_item("data_type", &self.inner.data_type)?;
if let Some(confidence) = self.inner.confidence {
dict.set_item("confidence", confidence)?;
}
Ok(dict.into())
})
}
}
#[pyclass(name = "ModelParameters")]
pub struct PyModelParameters {
pub inner: ModelParameters,
}
#[pymethods]
impl PyModelParameters {
#[new]
fn new(model_name: String) -> Self {
Self {
inner: ModelParameters::new(model_name),
}
}
fn with_provider<'a>(mut slf: PyRefMut<'a, Self>, provider: String) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_provider(provider);
slf
}
fn with_parameter(
mut slf: PyRefMut<Self>,
key: String,
value: PyObject,
) -> PyResult<PyRefMut<Self>> {
Python::with_gil(|py| {
let json_value = python_to_json_value(value, py)?;
slf.inner = slf.inner.clone().with_parameter(key, json_value);
Ok(slf)
})
}
#[getter]
fn model_name(&self) -> String {
self.inner.model_name.clone()
}
#[getter]
fn provider(&self) -> Option<String> {
self.inner.provider.clone()
}
#[getter]
fn parameters(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in &self.inner.parameters {
dict.set_item(key, json_value_to_python(value, py)?)?;
}
Ok(dict.into())
})
}
fn __repr__(&self) -> String {
format!("ModelParameters(model_name='{}')", self.inner.model_name)
}
}
#[pyclass(name = "ExecutionContext")]
pub struct PyExecutionContext {
pub inner: ExecutionContext,
}
#[pymethods]
impl PyExecutionContext {
#[new]
fn new() -> Self {
Self {
inner: ExecutionContext::new(),
}
}
fn with_runtime_version<'a>(
mut slf: PyRefMut<'a, Self>,
version: String,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_runtime_version(version);
slf
}
fn with_dependency<'a>(
mut slf: PyRefMut<'a, Self>,
name: String,
version: String,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_dependency(name, version);
slf
}
fn with_random_seed<'a>(mut slf: PyRefMut<'a, Self>, seed: i64) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_random_seed(seed);
slf
}
fn with_env_var<'a>(
mut slf: PyRefMut<'a, Self>,
key: String,
value: String,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_env_var(key, value);
slf
}
#[getter]
fn runtime_version(&self) -> Option<String> {
self.inner.runtime_version.clone()
}
#[getter]
fn dependencies(&self) -> PyObject {
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in &self.inner.dependencies {
dict.set_item(key, value).unwrap();
}
dict.into()
})
}
#[getter]
fn random_seed(&self) -> Option<i64> {
self.inner.random_seed
}
#[getter]
fn environment_variables(&self) -> PyObject {
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in &self.inner.environment_variables {
dict.set_item(key, value).unwrap();
}
dict.into()
})
}
#[getter]
fn hardware_info(&self) -> PyObject {
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in &self.inner.hardware_info {
let py_value = json_value_to_python(value, py).unwrap();
dict.set_item(key, py_value).unwrap();
}
dict.into()
})
}
fn __repr__(&self) -> String {
"ExecutionContext()".to_string()
}
}
#[pyclass(name = "DecisionSnapshot")]
pub struct PyDecisionSnapshot {
pub inner: DecisionSnapshot,
}
#[pymethods]
impl PyDecisionSnapshot {
#[new]
fn new(function_name: String) -> Self {
Self {
inner: DecisionSnapshot::new(function_name),
}
}
fn with_module<'a>(mut slf: PyRefMut<'a, Self>, module_name: String) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_module(module_name);
slf
}
fn add_input<'a>(mut slf: PyRefMut<'a, Self>, input: PyRef<'a, PyInput>) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().add_input(input.inner.clone());
slf
}
fn add_output<'a>(
mut slf: PyRefMut<'a, Self>,
output: PyRef<'a, PyOutput>,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().add_output(output.inner.clone());
slf
}
fn with_model_parameters<'a>(
mut slf: PyRefMut<'a, Self>,
params: PyRef<'a, PyModelParameters>,
) -> PyRefMut<'a, Self> {
slf.inner = slf
.inner
.clone()
.with_model_parameters(params.inner.clone());
slf
}
fn with_execution_time<'a>(mut slf: PyRefMut<'a, Self>, time_ms: f64) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_execution_time(time_ms);
slf
}
fn add_tag<'a>(mut slf: PyRefMut<'a, Self>, key: String, value: String) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().add_tag(key, value);
slf
}
#[getter]
fn function_name(&self) -> String {
self.inner.function_name.clone()
}
#[getter]
fn module_name(&self) -> Option<String> {
self.inner.module_name.clone()
}
#[getter]
fn execution_time_ms(&self) -> Option<f64> {
self.inner.execution_time_ms
}
#[getter]
fn inputs(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let list = PyList::empty(py);
for input in &self.inner.inputs {
let py_input = PyInput {
inner: input.clone(),
};
list.append(Py::new(py, py_input)?)?;
}
Ok(list.into())
})
}
#[getter]
fn outputs(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let list = PyList::empty(py);
for output in &self.inner.outputs {
let py_output = PyOutput {
inner: output.clone(),
};
list.append(Py::new(py, py_output)?)?;
}
Ok(list.into())
})
}
#[getter]
fn tags(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in &self.inner.tags {
dict.set_item(key, value)?;
}
Ok(dict.into())
})
}
fn __repr__(&self) -> String {
format!(
"DecisionSnapshot(function_name='{}')",
self.inner.function_name
)
}
}
#[pyclass(name = "Snapshot")]
pub struct PySnapshot {
pub inner: Snapshot,
}
#[pymethods]
impl PySnapshot {
#[new]
fn new(snapshot_type: String) -> PyResult<Self> {
let snap_type = match snapshot_type.as_str() {
"session" => SnapshotType::Session,
"decision" => SnapshotType::Decision,
"batch" => SnapshotType::Batch,
_ => {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Invalid snapshot type: {}",
snapshot_type
)))
}
};
Ok(Self {
inner: Snapshot::new(snap_type),
})
}
fn add_decision(&mut self, decision: PyRef<PyDecisionSnapshot>) {
self.inner.add_decision(decision.inner.clone());
}
#[getter]
fn decisions(&self) -> PyResult<PyObject> {
Python::with_gil(|py| {
let list = PyList::empty(py);
for decision in &self.inner.decisions {
let py_decision = PyDecisionSnapshot {
inner: decision.clone(),
};
list.append(Py::new(py, py_decision)?)?;
}
Ok(list.into())
})
}
#[getter]
fn snapshot_type(&self) -> String {
format!("{:?}", self.inner.snapshot_type).to_lowercase()
}
fn __repr__(&self) -> String {
format!(
"Snapshot(type='{}', decisions={})",
format!("{:?}", self.inner.snapshot_type).to_lowercase(),
self.inner.decisions.len()
)
}
}
#[pyclass(name = "SnapshotQuery")]
pub struct PySnapshotQuery {
pub inner: briefcase_core::storage::SnapshotQuery,
}
#[pymethods]
impl PySnapshotQuery {
#[new]
fn new() -> Self {
Self {
inner: briefcase_core::storage::SnapshotQuery::new(),
}
}
fn with_function_name<'a>(
mut slf: PyRefMut<'a, Self>,
function_name: String,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_function_name(function_name);
slf
}
fn with_module_name<'a>(
mut slf: PyRefMut<'a, Self>,
module_name: String,
) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_module_name(module_name);
slf
}
fn with_limit<'a>(mut slf: PyRefMut<'a, Self>, limit: usize) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_limit(limit);
slf
}
fn with_offset<'a>(mut slf: PyRefMut<'a, Self>, offset: usize) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_offset(offset);
slf
}
fn with_tag<'a>(mut slf: PyRefMut<'a, Self>, key: String, value: String) -> PyRefMut<'a, Self> {
slf.inner = slf.inner.clone().with_tag(key, value);
slf
}
fn __repr__(&self) -> String {
"SnapshotQuery()".to_string()
}
}
pub fn python_to_json_value(obj: PyObject, py: Python) -> PyResult<serde_json::Value> {
if obj.is_none(py) {
Ok(serde_json::Value::Null)
} else if let Ok(b) = obj.extract::<bool>(py) {
Ok(serde_json::Value::Bool(b))
} else if let Ok(i) = obj.extract::<i64>(py) {
Ok(serde_json::Value::Number(serde_json::Number::from(i)))
} else if let Ok(f) = obj.extract::<f64>(py) {
if let Some(n) = serde_json::Number::from_f64(f) {
Ok(serde_json::Value::Number(n))
} else {
Ok(serde_json::Value::Null)
}
} else if let Ok(s) = obj.extract::<String>(py) {
Ok(serde_json::Value::String(s))
} else if let Ok(list) = obj.downcast_bound::<PyList>(py) {
let mut vec = Vec::new();
for item in list.iter() {
vec.push(python_to_json_value(item.into(), py)?);
}
Ok(serde_json::Value::Array(vec))
} else if let Ok(dict) = obj.downcast_bound::<PyDict>(py) {
let mut map = serde_json::Map::new();
for (key, value) in dict.iter() {
let key_str = key.extract::<String>()?;
map.insert(key_str, python_to_json_value(value.into(), py)?);
}
Ok(serde_json::Value::Object(map))
} else {
let s = obj.to_string();
Ok(serde_json::Value::String(s))
}
}
#[allow(deprecated)]
pub fn json_value_to_python(value: &serde_json::Value, py: Python) -> PyResult<PyObject> {
match value {
serde_json::Value::Null => Ok(py.None()),
serde_json::Value::Bool(b) => Ok(b.to_object(py)),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Ok(i.to_object(py))
} else if let Some(f) = n.as_f64() {
Ok(f.to_object(py))
} else {
Ok(py.None())
}
}
serde_json::Value::String(s) => Ok(s.to_object(py)),
serde_json::Value::Array(arr) => {
let list = PyList::empty(py);
for item in arr {
list.append(json_value_to_python(item, py)?)?;
}
Ok(list.into())
}
serde_json::Value::Object(map) => {
let dict = PyDict::new(py);
for (key, value) in map {
dict.set_item(key, json_value_to_python(value, py)?)?;
}
Ok(dict.into())
}
}
}