pub mod expanded;
pub mod ingest;
pub mod json_schema;
pub mod lifted;
pub mod normalized;
pub mod prompt_plan;
pub mod task_plan;
pub mod token_plan;
#[cfg(feature = "python")]
use std::sync::Arc;
#[cfg(feature = "python")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "python")]
use pyo3::exceptions::PyValueError;
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[cfg(feature = "python")]
use pyo3::types::{PyAnyMethods, PyDict, PyModule, PyString, PyType};
#[cfg(feature = "python")]
use pyo3_stub_gen::define_stub_info_gatherer;
#[cfg(feature = "python")]
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
#[cfg(feature = "python")]
use task_plan::PlannedTask;
#[cfg(feature = "python")]
impl From<normalized::SchemaLoadError> for PyErr {
fn from(e: normalized::SchemaLoadError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
impl From<normalized::SchemaNormalizeError> for PyErr {
fn from(e: normalized::SchemaNormalizeError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
impl From<expanded::SchemaExpandError> for PyErr {
fn from(e: expanded::SchemaExpandError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
impl From<lifted::SchemaLiftError> for PyErr {
fn from(e: lifted::SchemaLiftError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
impl From<task_plan::TaskPlanError> for PyErr {
fn from(e: task_plan::TaskPlanError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
impl From<prompt_plan::PromptPlanError> for PyErr {
fn from(e: prompt_plan::PromptPlanError) -> Self {
PyValueError::new_err(e.to_string())
}
}
#[cfg(feature = "python")]
#[pymodule]
#[pyo3(name = "ie_schema")]
fn ieschema_library(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<IESchema>()?;
m.add_class::<Task>()?;
m.add_class::<ClassificationTask>()?;
m.add_class::<EntityExtractionTask>()?;
m.add_class::<RelationExtractionTask>()?;
m.add_class::<JSONStructureTask>()?;
m.add_class::<StructureChild>()?;
Ok(())
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(module = "ie_schema")]
pub struct IESchema {
task_plan: Arc<task_plan::TaskPlan>,
prompt_plan: prompt_plan::PromptPlan,
iter_index: AtomicUsize,
}
#[cfg(feature = "python")]
impl IESchema {
fn from_normalized(normalized: normalized::NormalizedSchema) -> PyResult<Self> {
let expanded = expanded::ExpandedSchema::try_from(normalized)?;
let lifted = lifted::LiftedSchema::try_from(expanded)?;
let tp = task_plan::TaskPlan::try_from(lifted)?;
let pp = prompt_plan::PromptPlan::try_from(tp.clone())?;
Ok(Self {
task_plan: Arc::new(tp),
prompt_plan: pp,
iter_index: AtomicUsize::new(0),
})
}
fn loads_inner_bytes(bytes: &[u8]) -> PyResult<Self> {
let normalized = normalized::NormalizedSchema::from_json_bytes(bytes)?;
Self::from_normalized(normalized)
}
fn loads_inner(s: &str) -> PyResult<Self> {
Self::loads_inner_bytes(s.as_bytes())
}
}
#[cfg(feature = "python")]
fn json_schema_utf8_bytes_from_type<'py>(
py: Python<'py>,
type_obj: &Bound<'py, PyType>,
) -> PyResult<Vec<u8>> {
let json_mod = PyModule::import(py, "json")?;
let builtins = PyModule::import(py, "builtins")?;
let dataclasses = PyModule::import(py, "dataclasses")?;
let is_dataclass = dataclasses.getattr("is_dataclass")?;
let is_dc: bool = is_dataclass.call1((type_obj,))?.extract()?;
let pydantic_mod = match PyModule::import(py, "pydantic") {
Ok(m) => Some(m),
Err(e) => {
if is_dc {
return Err(PyValueError::new_err(format!(
"IESchema.loads: converting a dataclass to JSON schema requires Pydantic v2 \
(install with `uv add pydantic` or `pip install pydantic`). \
Original import error: {e}"
)));
}
None
}
};
let pyd = pydantic_mod
.as_ref()
.ok_or_else(loads_unsupported_input_error)?;
let base_model = pyd.getattr("BaseModel")?;
let issub = builtins.getattr("issubclass")?;
let is_model = match issub.call1((type_obj, &base_model)) {
Ok(v) => v.is_truthy()?,
Err(_) => false,
};
let schema_obj = if is_model {
type_obj.call_method0("model_json_schema")?
} else if is_dc {
let type_adapter = pyd.getattr("TypeAdapter")?.call1((type_obj,))?;
type_adapter.call_method0("json_schema")?
} else {
return Err(loads_unsupported_input_error());
};
let dumps = json_mod.getattr("dumps")?;
let kwargs = PyDict::new(py);
kwargs.set_item("ensure_ascii", false)?;
let dumped = dumps.call((&schema_obj,), Some(&kwargs))?;
let encoded = dumped.call_method1("encode", ("utf-8",))?;
encoded.extract()
}
#[cfg(feature = "python")]
fn loads_unsupported_input_error() -> PyErr {
PyValueError::new_err(
"IESchema.loads: expected a JSON `str` (IE ingest or root JSON Schema), a `type` \
(stdlib dataclass or Pydantic v2 BaseModel), or an instance of such a type; got an \
unsupported value",
)
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl IESchema {
#[classmethod]
fn loads(_cls: &Bound<'_, PyType>, input: &Bound<'_, PyAny>) -> PyResult<Self> {
if input.is_instance_of::<PyString>() {
let s: String = input.extract()?;
return Self::loads_inner(&s);
}
let type_obj: Bound<'_, PyType> = if let Ok(t) = input.cast::<PyType>() {
t.clone()
} else {
input.get_type()
};
let utf8 = json_schema_utf8_bytes_from_type(input.py(), &type_obj)?;
Self::loads_inner_bytes(&utf8)
}
#[classmethod]
fn load(_cls: &Bound<'_, PyType>, path: String) -> PyResult<Self> {
let content = std::fs::read_to_string(&path)
.map_err(|e| PyValueError::new_err(format!("failed to read {}: {}", path, e)))?;
Self::loads_inner(&content)
}
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf.iter_index.store(0, Ordering::Relaxed);
slf
}
fn __next__(slf: PyRefMut<'_, Self>) -> Option<Py<PyAny>> {
let idx = slf.iter_index.load(Ordering::Relaxed);
if idx >= slf.task_plan.tasks.len() {
return None;
}
slf.iter_index.store(idx + 1, Ordering::Relaxed);
let arc = slf.task_plan.clone();
let py = slf.py();
match &slf.task_plan.tasks[idx] {
PlannedTask::Classification(_) => {
let obj = Bound::new(
py,
PyClassInitializer::from(Task {}).add_subclass(ClassificationTask {
task_plan: arc,
index: idx,
}),
)
.unwrap();
Some(obj.into_any().unbind())
}
PlannedTask::Entity(_) => {
let obj = Bound::new(
py,
PyClassInitializer::from(Task {}).add_subclass(EntityExtractionTask {
task_plan: arc,
index: idx,
}),
)
.unwrap();
Some(obj.into_any().unbind())
}
PlannedTask::Relation(_) => {
let obj = Bound::new(
py,
PyClassInitializer::from(Task {}).add_subclass(RelationExtractionTask {
task_plan: arc,
index: idx,
}),
)
.unwrap();
Some(obj.into_any().unbind())
}
PlannedTask::Structure(_) => {
let obj = Bound::new(
py,
PyClassInitializer::from(Task {}).add_subclass(JSONStructureTask {
task_plan: arc,
index: idx,
}),
)
.unwrap();
Some(obj.into_any().unbind())
}
}
}
fn prompt(&self) -> String {
self.prompt_plan.render_debug_string()
}
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(subclass, module = "ie_schema")]
pub struct Task {}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(extends = Task, module = "ie_schema")]
pub struct ClassificationTask {
task_plan: Arc<task_plan::TaskPlan>,
index: usize,
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl ClassificationTask {
#[getter]
fn task(&self) -> String {
let PlannedTask::Classification(ref ctp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
ctp.task.to_string()
}
#[getter]
fn labels(&self) -> Vec<String> {
let PlannedTask::Classification(ref ctp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
ctp.labels.iter().map(|l| l.to_string()).collect()
}
#[getter]
fn threshold(&self) -> Option<f64> {
let PlannedTask::Classification(ref ctp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
ctp.threshold
}
#[getter]
fn multi_label(&self) -> bool {
let PlannedTask::Classification(ref ctp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
ctp.multi_label
}
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(extends = Task, module = "ie_schema")]
pub struct EntityExtractionTask {
task_plan: Arc<task_plan::TaskPlan>,
index: usize,
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl EntityExtractionTask {
#[getter]
fn entities(&self) -> Vec<String> {
let PlannedTask::Entity(ref etp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
etp.entities.iter().map(|e| e.to_string()).collect()
}
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(extends = Task, module = "ie_schema")]
pub struct RelationExtractionTask {
task_plan: Arc<task_plan::TaskPlan>,
index: usize,
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl RelationExtractionTask {
#[getter]
fn name(&self) -> String {
let PlannedTask::Relation(ref rtp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
rtp.relation.to_string()
}
#[getter]
fn head(&self) -> String {
let PlannedTask::Relation(ref rtp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
rtp.head.to_string()
}
#[getter]
fn tail(&self) -> String {
let PlannedTask::Relation(ref rtp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
rtp.tail.to_string()
}
#[getter]
fn description(&self) -> Option<String> {
let PlannedTask::Relation(ref rtp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
rtp.description.clone()
}
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(extends = Task, module = "ie_schema")]
pub struct JSONStructureTask {
task_plan: Arc<task_plan::TaskPlan>,
index: usize,
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl JSONStructureTask {
#[getter]
fn name(&self) -> String {
let PlannedTask::Structure(ref stp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
stp.structure.to_string()
}
#[getter]
fn children(&self) -> Vec<StructureChild> {
let PlannedTask::Structure(ref stp) = self.task_plan.tasks[self.index] else {
unreachable!()
};
stp.children
.iter()
.enumerate()
.map(|(ci, _)| StructureChild {
task_plan: self.task_plan.clone(),
structure_index: self.index,
child_index: ci,
})
.collect()
}
}
#[cfg(feature = "python")]
#[gen_stub_pyclass]
#[pyclass(module = "ie_schema")]
pub struct StructureChild {
task_plan: Arc<task_plan::TaskPlan>,
structure_index: usize,
child_index: usize,
}
#[cfg(feature = "python")]
#[gen_stub_pymethods]
#[pymethods]
impl StructureChild {
#[getter]
fn property(&self) -> String {
let PlannedTask::Structure(ref stp) = self.task_plan.tasks[self.structure_index] else {
unreachable!()
};
stp.children[self.child_index].property.to_string()
}
#[getter]
fn choices(&self) -> Vec<String> {
let PlannedTask::Structure(ref stp) = self.task_plan.tasks[self.structure_index] else {
unreachable!()
};
stp.children[self.child_index]
.choices
.iter()
.map(|c| c.to_string())
.collect()
}
#[getter]
fn description(&self) -> Option<String> {
let PlannedTask::Structure(ref stp) = self.task_plan.tasks[self.structure_index] else {
unreachable!()
};
stp.children[self.child_index].description.clone()
}
}
#[cfg(feature = "python")]
define_stub_info_gatherer!(stub_info);