import json
import time
def get_name():
return "My python Worker"
def get_short_description():
return "My python Worker"
def get_description():
return """This is my long description
over multilines
"""
def get_version():
return "0.0.3"
def get_parameters():
return [
{
"identifier": "source_path",
"label": "My parameter",
"kind": ["string"],
"required": True,
},
{
"identifier": "destination_path",
"label": "My array parameter",
"kind": ["string"],
"required": False,
}
]
def init():
print("Initialise Python worker...")
def process(handle_callback, parameters, job_id):
print("Job ID: ", job_id)
print("Parameters: ", parameters)
assert parameters["action"] == "completed"
assert parameters["number"] == 123
assert parameters["array of strings"] == ["value_1", "value_2"]
assert parameters["array of integers"] == [0, 1, 2, 3, 4, 5, 6]
assert parameters["source_path"] is not None
assert parameters["destination_path"] is not None
assert parameters["requirements"] == parameters["source_path"]
for i in range(0, 25):
if handle_callback.is_stopped():
handle_callback.set_job_status("stopped")
return { "destination_paths": ["/path/to/generated/file.ext"] }
time.sleep(1)
handle_callback.publish_job_progression(i * 4)
return {
"destination_paths": ["/path/to/generated/file.ext"]
}