Expand description
mahler is an automated job orchestration library that builds and executes dynamic workflows.
The library uses automated planning (heavily based on HTNs) to compose user defined jobs into a workflow (represented as a DAG) to achieve a desired target system state.
This library’s API is heavily inspired by Axum’s.
§Features
- Simple API - system state is defined as Rust structs with the help of the provided derive crate. Allowed tasks are defined as pure Rust functions acting on a part or the whole system state.
- State engine with integrated planner - tasks are configured as jobs in a
Workerdomain. On a new target state, the worker will look for necessary changes to reach the target and look for a workflow that allows to reach the target from the current state. - Concurrent execution - the internal planner detects when tasks can run concurrently based on state paths
- Automatic re-planning - re-computes workflow when runtime conditions change
- Observable runtime - monitor the evolving state of the system from the Worker API. For more detailed logging, the library uses the tracing crate.
- Easy to debug - worker observable state and known goals allow easy replicability when issues occur.
§Worker
A Worker orchestrates jobs into a workflow and executes the workflow tasks when given a target. The worker also manages the system state model, making changes as the workflow is executed.
When creating a Worker, the first step is to assign jobs to specific routes and operations within the system state. Once the jobs are set-up, providing the worker with an initial state makes the worker ready to operate on the system.
The only way to control the worker and effect changes into the controlled system is to provide the worker with a target state.
use mahler::state::State;
use mahler::worker::Worker;
use mahler::job::{create, update};
#[derive(State)]
struct MySystem;
fn global() {}
fn foo() {}
fn foo_bar() {}
let mut worker = Worker::new()
// assign possible jobs to worker
.job("", update(global))
.job("/{foo}", update(foo))
.job("/{foo}/{bar}", create(foo_bar))
// initialize the worker state
.initial_state(MySystem {/* .. */})?;
// Control the system by providing a new target state
let (state, workflow_status) = worker.seek_target(MySystemTarget { /* .. */ }).await?;When comparing the internal state with the target, the Worker’s planner will generate a list of differences between the states (see JSON Patch) and try to find a plan within the jobs that are applicable to a path.
For instance, in the worker defined above, if the target finds a new value is created in the path /a/b
it will try jobs from more general to more specific, meaning it will try
- global
- foo
- foo_bar
§Operations
Jobs may be applicable to operations create (add), update (replace), delete (remove), any and none,
meaning they may be selected when a new property is created/updated or removed from the system
state. A task assigned to none is never selected by the planner, but may be used as part of
compound tasks. All potentially runnable jobs need to be linked to the worker, hence the
need for none jobs.
See Operation for more information.
§State
The library relies internally on JSON values for state representation. Parts of the state can be referenced using JSON pointers (see RFC 6901) and state differences are calculated using JSON patch (see RFC 6902).
For this reason, the system state can only be modelled using serializable data structures.
The worker and planner make distinction between the internal state of the system, and what
can be used as target state. For instance, the start date of a process is good for reference
information, but it doesn’t make a good target. When modelling state, internal state can be
annotated using #[mahler(internal)] which means these properties will not be used in the
comparison of with the target state.
use std::time::SystemTime;
use mahler::state::{State, List};
// the `State` macro implements `Serialize` and `Deserialize` for
// the struct and creates an associated type `ServiceTarget` without
// any internal properties
#[derive(State)]
struct Service {
// will be used when planning
cmd: List<String>,
// will not be used for planning
#[mahler(internal)]
start_time: SystemTime,
}For accessing read-only, non-serializable resources from tasks, Mahler provides a separate mechanism.
§Shared resources
Sometimes it may be desirable for jobs to access a shared resource (e.g. database connection,
file descriptor, etc.). These can be provided when creating the worker, with the only
restriction is that these structures must be Send and Sync.
use mahler::state::State;
use mahler::extract::Res;
use mahler::job::update;
use mahler::worker::Worker;
// the system state
#[derive(State)]
struct MySystem;
// MyConnection represents a shared resource
struct MyConnection;
// Tasks can make use of resources via the `Res` extractor
fn some_task(conn: Res<MyConnection>) {}
let conn = MyConnection {/* .. */};
let worker = Worker::new()
.resource::<MyConnection>(conn)
.job("/", update(some_task))
.initial_state(MySystem {/* .. */})?;Note that only one resource of each type can be provided to the worker.
§Tasks and Jobs
A Task in Mahler is an operation on a part of the system state that may chose to make changes to the state given some target. It is defined as a pure Rust handler and it may or may not perform IO. A Job is the configuration of a task to an operation on the system state.
A task is defined via a Handler and is applied to a specific Context, which is composed of an application path (a JSON pointer to a part of the system state), an optional target and zero or more path arguments.
A handler in mahler is any function that accepts zero or more “extractors” as arguments and returns something that can be converted into an effect on the system.
§Extractors
An extractor is a type that implements FromSystem. Extractors are how the planning/execution context is passed to the handler.
use mahler::state::State;
use mahler::extract::{View, Args, Target, System, Res};
struct MyConnection;
#[derive(State)]
struct MySystem;
// `View` provides a view into the relevant part of the
// state for the handler and allows making changes to the state.
fn view(state: View<u32>) {}
// For nullable values, use `View<Option<T>>`
// for instance, in the case of `create` operations
fn nullable_view(state: View<Option<u32>>) {}
// `Args` gives you the path arguments and deserializes them
fn args(Args(counter_name): Args<String>) {}
// `Target` gives you the target value for the Job operation
// note that `delete` operations do not have a target.
fn target(Target(tgt): Target<u32>) {}
// `System` provides a view into the top level system state.
// A Job using the System extractor cannot run concurrently to other jobs
fn system(System(state): System<MySystem>) {}
// `Res` allows to access a shared resource
fn res(res: Res<MyConnection>) {}For extractors using generics, using a type that cannot be deserialized from the internal worker state will result in an Error. This means the task won’t be usable by the planner, resulting in a warning (or a failure in debug builds).
§Modifying the system state
The View extractor provides a mechanism to modify the system state by returning the modified view.
use mahler::extract::{View, Target};
// create a task to update a counter
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> Option<View<u32>> {
// `View` implements Deref and DerefMut to
// operate on the internal value
if *counter >= tgt {
// condition failed, abort the task
return None;
}
*counter += 1;
Some(counter)
}
// remove the counter
fn delete_counter(mut view: View<Option<u32>>) -> View<Option<u32>> {
// if the counter has not changed (is already none), then the task will be ignored
// by the planner (unless in a method)
view.take();
view
}Internally, the cumulative changes to the View extractors are converted by the planner to a
Patch and used to determine
the applicability of the task to a given target. At runtime, the same patch is used first to
determine if the task is safe to apply, and later to update the internal worker state.
§System Effects (I/O)
In mahler, the task handler needs to be executed in two different contexts:
- At planning, the context for the job is determined (current state, path, target) and the corresponding task is tested to simulate the changes it introduces without actually modifying the underlying system. The same job may be tested multiple times while planning.
- At runtime, the tasks composing the workflow are executed in the corresponding order determined by the planner and changes to the underlying system are performed.
This 2-in-1 function evaluation is enabled by the introduction of the IO
type. An IO combines both a pure operation on an input and an effectful or IO
operation. This type is what can be used in mahler jobs to isolate effects on the underlying system.
use tokio::time::{sleep, Duration};
use mahler::extract::{View, Target};
use mahler::task::{with_io, IO, enforce};
// create a Job to update a counter
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> IO<u32> {
// abort the task if the counter is over the target
enforce!(*counter < tgt);
// update the counter if below the target
*counter += 1;
// return an IO type to isolate system changes
with_io(counter, |counter| async move {
// the IO portion will only ever be called if the job is
// selected by the planner
// perform IO here
sleep(Duration::from_millis(10)).await;
// with_io expects a Result output
Ok(counter)
})
}use tokio::time::{sleep, Duration};
use tokio::runtime::Runtime;
use mahler::extract::{View, Target};
fn plus_one(mut counter: View<u32>, Target(tgt): Target<u32>) -> View<u32> {
if *counter < tgt {
// update the counter if below the target
*counter += 1;
}
Runtime::new().unwrap().block_on(async {
// This is a footgun as it adds 100ms every time
// the task is tested by the planner
sleep(Duration::from_millis(100)).await;
});
counter
}§Compound tasks
Sometimes it may be desirable to re-use tasks in different contexts, or combine multiple tasks in order to guide the planner. This can be achieved by the use of compound tasks, called Methods in the Mahler API.
A Method handler is any function that receives zero or more extractors and
returns something that can be converted to a Vec of tasks.
use mahler::extract::{View, Target};
use mahler::task::{Task, Handler};
fn plus_one() {/* .. */}
// define a method
fn plus_two(counter: View<i32>, Target(tgt): Target<i32>) -> Vec<Task> {
if tgt - *counter > 1 {
// Return two instances of the `plus_one` task
return vec![
// Provide a target for the task.
// `with_target` assigns a target to the task
plus_one.with_target(tgt),
plus_one.with_target(tgt)
];
}
// returning nothing means the method will not be picked
// by the planner
vec![]
}
// methods can also call other methods
fn plus_three(counter: View<i32>, Target(tgt): Target<i32>) -> Vec<Task> {
if tgt - *counter > 2 {
return vec![plus_two.with_target(tgt), plus_one.with_target(tgt)];
}
vec![]
}A task Handler may be converted to a Task by using the into_task method or using one of the helper methods with_target or with_arg.
§Error handling
All possible errors by Mahler operations are defined by the ErrorKind type.
When calling Worker::seek_target, the worker will return an
error if there is a problem with serialization or if an error happens at planning or there is
an internal error. If an error happens at workflow execution, the method will not return an
error but terminate with an Aborted value, which will include
a Vec of all errors that happened during the workflow execution with the either
ErrorKind::Runtime type or the
ErrorKind:ConditionNotMet type.
§Observability
The Worker::seek_target method is instrumented using the tracing crate to report on the operation and progress of the Worker planning and workflow execution. These events can be processed using the tracing_subscriber crate to produce structured or human readable logs.
For additional control and a more granular view of system changes during Worker operation, the find_workflow and run_workflow Worker methods can be used.
- find_workflow looks up a workflow to a given target.
- run_workflow consumes the worker and the given workflow and return a stream of events, indicating a new system state or the workflow execution completion. Note that if the worker has sensors defined, the stream will continue producing values even after workflow completion.
Similarly, to only monitor the system and not produce changes, the Worker exposes a listen method that produces values from sensors as a stream (if any).
§Testing
The find_workflow and run_task methods of Worker can also be used for respectively testing the configuration of the workerby comparing the generated plans against some expectations and implementation of tasks.
The library exposes the following workflow types and macros for this purpose
- Dag an DAG implementation used internally by mahler.
- dag a declarative macro to combine DAGs into branches
- seq a declarative macro to create a linear DAG from a list of values
- par a declarative macro to create a branching DAG with single value branches
use mahler::task::{IO, with_io, enforce};
use mahler::extract::{View, Target};
use mahler::worker::Worker;
use mahler::dag::{Dag, seq};
use mahler::job::update;
fn plus_one(mut counter: View<i32>, Target(tgt): Target<i32>) -> IO<i32> {
enforce!(*counter < tgt);
*counter += 1;
// Return the updated counter
with_io(counter, |counter| async {
Ok(counter)
})
}
// Setup the worker domain and resources
let worker = Worker::new()
.job("", update(plus_one).with_description(|| "+1"))
.initial_state(0).unwrap();
let workflow = worker.find_workflow(2).unwrap().unwrap();
// We expect a linear DAG with two tasks
let expected: Dag<&str> = seq!("+1", "+1");
assert_eq!(workflow.to_string(), expected.to_string());Modules§
- dag
- Directed Acyclic Graph implementation and methods
- error
- Error handling types
- exception
- Types for defining planning exceptions
- extract
- Types and traits for extracting task runtime context
- job
- Types for creating and manipulating Worker Jobs
- json
- JSON referencing and manipulation
- result
- Alias over
core::result::Result - sensor
- Types for defining sensors to monitor the system state
- serde
- Re-export of
serde_corefor use by mahler-derive - state
- State trait and standardized serialization along with State procedural macro
- sync
- State synchronization and runtime control
- task
- Types and traits for declaring and operating with Tasks
- worker
- Automated planning and execution of task workflows
Macros§
- enforce
- Enforces a condition and returns early with an aborted IO if the condition is false.