use crate::bindings::json_utils::{json_value_to_py, py_to_json_value};
use crate::task_manager::{
CancellationToken, TaskBuilder, TaskFilter, TaskHandle, TaskInfo, TaskManager,
TaskManagerConfig, TaskStatus,
};
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
#[pyclass(name = "TaskStatus", eq)]
#[derive(Clone, PartialEq)]
pub struct PyTaskStatus {
inner: TaskStatus,
}
#[pymethods]
impl PyTaskStatus {
#[classattr]
#[allow(non_snake_case)]
fn PENDING() -> Self {
Self {
inner: TaskStatus::Pending,
}
}
#[classattr]
#[allow(non_snake_case)]
fn RUNNING() -> Self {
Self {
inner: TaskStatus::Running,
}
}
#[classattr]
#[allow(non_snake_case)]
fn PAUSED() -> Self {
Self {
inner: TaskStatus::Paused,
}
}
#[classattr]
#[allow(non_snake_case)]
fn COMPLETED() -> Self {
Self {
inner: TaskStatus::Completed,
}
}
#[classattr]
#[allow(non_snake_case)]
fn FAILED() -> Self {
Self {
inner: TaskStatus::Failed,
}
}
#[classattr]
#[allow(non_snake_case)]
fn CANCELLED() -> Self {
Self {
inner: TaskStatus::Cancelled,
}
}
fn is_terminal(&self) -> bool {
self.inner.is_terminal()
}
fn is_active(&self) -> bool {
self.inner.is_active()
}
fn __repr__(&self) -> String {
format!("TaskStatus.{:?}", self.inner)
}
fn __str__(&self) -> String {
format!("{:?}", self.inner).to_lowercase()
}
}
impl From<TaskStatus> for PyTaskStatus {
fn from(status: TaskStatus) -> Self {
Self { inner: status }
}
}
#[pyclass(name = "TaskInfo")]
#[derive(Clone)]
pub struct PyTaskInfo {
inner: TaskInfo,
}
#[pymethods]
impl PyTaskInfo {
#[getter]
fn id(&self) -> &str {
&self.inner.id
}
#[getter]
fn name(&self) -> &str {
&self.inner.name
}
#[getter]
fn task_type(&self) -> &str {
&self.inner.task_type
}
#[getter]
fn status(&self) -> PyTaskStatus {
self.inner.status.into()
}
#[getter]
fn progress(&self) -> u8 {
self.inner.progress
}
#[getter]
fn progress_message(&self) -> Option<&str> {
self.inner.progress_message.as_deref()
}
#[getter]
fn created_at(&self) -> f64 {
self.inner
.created_at
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64()
}
#[getter]
fn started_at(&self) -> Option<f64> {
self.inner.started_at.map(|t| {
t.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64()
})
}
#[getter]
fn finished_at(&self) -> Option<f64> {
self.inner.finished_at.map(|t| {
t.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64()
})
}
#[getter]
fn error(&self) -> Option<&str> {
self.inner.error.as_deref()
}
#[getter]
fn result(&self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
match &self.inner.result {
Some(v) => Ok(Some(json_value_to_py(py, v)?)),
None => Ok(None),
}
}
fn get_metadata(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let dict = pyo3::types::PyDict::new(py);
for (k, v) in &self.inner.metadata {
dict.set_item(k, json_value_to_py(py, v)?)?;
}
Ok(dict.into())
}
fn get_labels(&self) -> std::collections::HashMap<String, String> {
self.inner.labels.clone()
}
fn to_json(&self) -> PyResult<String> {
serde_json::to_string(&self.inner).map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn __repr__(&self) -> String {
format!(
"TaskInfo(id='{}', name='{}', status={:?}, progress={})",
self.inner.id, self.inner.name, self.inner.status, self.inner.progress
)
}
}
#[pyclass(name = "CancellationToken")]
#[derive(Clone)]
pub struct PyCancellationToken {
inner: CancellationToken,
}
#[pymethods]
impl PyCancellationToken {
#[new]
fn new() -> Self {
Self {
inner: CancellationToken::new(),
}
}
fn cancel(&self) {
self.inner.cancel();
}
#[getter]
fn is_cancelled(&self) -> bool {
self.inner.is_cancelled()
}
fn child(&self) -> Self {
Self {
inner: self.inner.child(),
}
}
fn __repr__(&self) -> String {
format!("CancellationToken(cancelled={})", self.inner.is_cancelled())
}
}
#[pyclass(name = "TaskBuilder")]
#[derive(Clone)]
pub struct PyTaskBuilder {
inner: TaskBuilder,
}
#[pymethods]
impl PyTaskBuilder {
#[new]
fn new(name: &str, task_type: &str) -> Self {
Self {
inner: TaskBuilder::new(name, task_type),
}
}
fn metadata(&self, py: Python<'_>, key: &str, value: Py<PyAny>) -> PyResult<Self> {
let json_value = py_to_json_value(&value.bind(py).clone())?;
Ok(Self {
inner: self.inner.clone().metadata(key, json_value),
})
}
fn label(&self, key: &str, value: &str) -> Self {
Self {
inner: self.inner.clone().label(key, value),
}
}
fn __repr__(&self) -> String {
"TaskBuilder(...)".to_string()
}
}
#[pyclass(name = "TaskFilter")]
#[derive(Clone)]
pub struct PyTaskFilter {
inner: TaskFilter,
}
#[pymethods]
impl PyTaskFilter {
#[new]
fn new() -> Self {
Self {
inner: TaskFilter::new(),
}
}
fn status(&self, status: &PyTaskStatus) -> Self {
Self {
inner: self.inner.clone().status(status.inner),
}
}
fn task_type(&self, t: &str) -> Self {
Self {
inner: self.inner.clone().task_type(t),
}
}
fn label(&self, key: &str, value: &str) -> Self {
Self {
inner: self.inner.clone().label(key, value),
}
}
fn active(&self) -> Self {
Self {
inner: self.inner.clone().active(),
}
}
fn matches(&self, info: &PyTaskInfo) -> bool {
self.inner.matches(&info.inner)
}
fn __repr__(&self) -> String {
format!(
"TaskFilter(active_only={}, task_type={:?})",
self.inner.active_only, self.inner.task_type
)
}
}
#[pyclass(name = "TaskHandle")]
pub struct PyTaskHandle {
inner: TaskHandle,
}
#[pymethods]
impl PyTaskHandle {
#[getter]
fn id(&self) -> &str {
self.inner.id()
}
fn info(&self) -> PyTaskInfo {
PyTaskInfo {
inner: self.inner.info(),
}
}
#[getter]
fn status(&self) -> PyTaskStatus {
self.inner.status().into()
}
#[getter]
fn progress(&self) -> u8 {
self.inner.progress()
}
#[pyo3(signature = (progress, message=None))]
fn set_progress(&self, progress: u8, message: Option<&str>) {
self.inner.set_progress(progress, message);
}
fn log(&self, level: &str, message: &str) {
self.inner.log(level, message);
}
fn stdout(&self, line: &str) {
self.inner.stdout(line);
}
fn stderr(&self, line: &str) {
self.inner.stderr(line);
}
#[getter]
fn is_cancelled(&self) -> bool {
self.inner.is_cancelled()
}
fn cancel_token(&self) -> PyCancellationToken {
PyCancellationToken {
inner: self.inner.cancel_token(),
}
}
fn start(&self) {
self.inner.start();
}
fn complete(&self, py: Python<'_>, result: Option<Py<PyAny>>) -> PyResult<()> {
let json_result = match result {
Some(obj) => py_to_json_value(&obj.bind(py).clone())?,
None => serde_json::json!({}),
};
self.inner.complete(json_result);
Ok(())
}
fn fail(&self, error: &str) {
self.inner.fail(error);
}
fn __repr__(&self) -> String {
format!(
"TaskHandle(id='{}', status={:?}, progress={})",
self.inner.id(),
self.inner.status(),
self.inner.progress()
)
}
}
#[pyclass(name = "TaskManagerConfig")]
#[derive(Clone)]
pub struct PyTaskManagerConfig {
inner: TaskManagerConfig,
}
#[pymethods]
impl PyTaskManagerConfig {
#[new]
#[pyo3(signature = (retention_seconds=3600, max_concurrent=100))]
fn new(retention_seconds: u64, max_concurrent: usize) -> Self {
Self {
inner: TaskManagerConfig {
retention_period: Duration::from_secs(retention_seconds),
max_concurrent,
..Default::default()
},
}
}
#[getter]
fn retention_seconds(&self) -> u64 {
self.inner.retention_period.as_secs()
}
#[getter]
fn max_concurrent(&self) -> usize {
self.inner.max_concurrent
}
fn __repr__(&self) -> String {
format!(
"TaskManagerConfig(retention_seconds={}, max_concurrent={})",
self.inner.retention_period.as_secs(),
self.inner.max_concurrent
)
}
}
#[pyclass(name = "TaskManager")]
pub struct PyTaskManager {
inner: Arc<TaskManager>,
}
#[pymethods]
impl PyTaskManager {
#[new]
#[pyo3(signature = (config=None))]
fn new(config: Option<PyTaskManagerConfig>) -> Self {
let cfg = config.map(|c| c.inner).unwrap_or_default();
Self {
inner: Arc::new(TaskManager::new(cfg)),
}
}
fn create(&self, builder: &PyTaskBuilder) -> PyTaskHandle {
PyTaskHandle {
inner: self.inner.create(builder.inner.clone()),
}
}
fn create_task(&self, name: &str, task_type: &str) -> PyTaskHandle {
PyTaskHandle {
inner: self.inner.create(TaskBuilder::new(name, task_type)),
}
}
fn get(&self, id: &str) -> Option<PyTaskInfo> {
self.inner.get(id).map(|info| PyTaskInfo { inner: info })
}
fn get_handle(&self, id: &str) -> Option<PyTaskHandle> {
self.inner
.get_handle(id)
.map(|handle| PyTaskHandle { inner: handle })
}
#[pyo3(signature = (filter=None))]
fn list(&self, filter: Option<&PyTaskFilter>) -> Vec<PyTaskInfo> {
let f = filter.map(|f| f.inner.clone()).unwrap_or_default();
self.inner
.list(&f)
.into_iter()
.map(|info| PyTaskInfo { inner: info })
.collect()
}
fn list_active(&self) -> Vec<PyTaskInfo> {
self.inner
.list(&TaskFilter::new().active())
.into_iter()
.map(|info| PyTaskInfo { inner: info })
.collect()
}
fn cancel(&self, id: &str) -> PyResult<()> {
self.inner
.cancel(id)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn pause(&self, id: &str) -> PyResult<()> {
self.inner
.pause(id)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn resume(&self, id: &str) -> PyResult<()> {
self.inner
.resume(id)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn remove(&self, id: &str) -> PyResult<()> {
self.inner
.remove(id)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn cleanup(&self) {
self.inner.cleanup();
}
fn __repr__(&self) -> String {
"TaskManager()".to_string()
}
}