Skip to main content

Crate hatchet_sdk

Crate hatchet_sdk 

Source
Expand description

§🪓 Hatchet SDK for Rust

This is an unofficial Rust SDK for Hatchet, a distributed, fault-tolerant task queue. This crate allows you to integrate Hatchet into your Rust applications.

§Setup

§Install protoc

This crate uses tonic to generate gRPC client stubs from Hatchet’s protobuf files. To build the library, you’ll need to install the Protocol Buffer Compiler (protoc). See the installation instructions for your operating system.

§Add crate to dependencies

Add the SDK as a dependency to your Rust project with Cargo:

cargo add hatchet-sdk

§Hatchet authentication

We recommend adding your Hatchet API token to a .env file and installing dotenvy to load it in your application for local development.

§Hatchet Version Compatibility

This library is tested against the following Hatchet versions:

VersionCompatible
v0.67.0
v0.68.0
v0.69.0
v0.70.0
v0.71.0
v0.72.0
v0.73.0
v0.74.0
v0.75.0
v0.77.0
v0.78.0
v0.79.0
v0.80.0
v0.81.0
v0.82.0
v0.83.0

§Declaring Your First Task

§Defining a task

Start by declaring a task with a name. The task object can be built with optional configuration options. Tasks have input and output types, which should implement the Serialize and Deserialize traits from serde for JSON serialization and deserialization.

§Running a task

With your task defined, you can import it wherever you need to use it and invoke it with the run method.

NOTE: You must first register the task on a worker before you can run it.
use hatchet_sdk::anyhow;
use hatchet_sdk::serde::*;
use hatchet_sdk::tokio;
use hatchet_sdk::{Context, Hatchet, Runnable, TriggerWorkflowOptionsBuilder};

#[derive(Serialize, Deserialize)]
#[serde(crate = "hatchet_sdk::serde")]
struct SimpleInput {
    pub message: String,
}

#[derive(Serialize, Deserialize)]
#[serde(crate = "hatchet_sdk::serde")]
struct SimpleOutput {
    pub transformed_message: String,
}

async fn simple_task_func(input: SimpleInput, ctx: Context) -> anyhow::Result<SimpleOutput> {
    ctx.log("Starting simple task").await?;
    Ok(SimpleOutput {
        transformed_message: input.message.to_lowercase(),
    })
}

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();

    let hatchet: Hatchet = Hatchet::from_env().await.unwrap();

    let simple_task = hatchet
        .task("simple-task", simple_task_func)
        .build()
        .unwrap();

    let input = SimpleInput {
        message: String::from("Hello, world!"),
    };

    let options = TriggerWorkflowOptionsBuilder::default()
        .additional_metadata(Some(serde_json::json!({
            "environment": "dev",
        })))
        .build()
        .unwrap();

    // Run the task asynchronously, immediately returning the run ID
    let _run_id = simple_task.run_no_wait(&input, Some(&options)).await.unwrap();
    // Run the task synchronously, waiting for a worker to complete it and return the result
    let result = simple_task.run(&input, Some(&options)).await.unwrap();
    println!("Result: {}", result.transformed_message);
}

§Workers

Workers are responsible for executing individual tasks.

§Declaring a Worker

Declare a worker by calling the worker method on the Hatchet client. Tasks and workflows can be added to the worker. When the worker starts it will register the tasks with the Hatchet engine, allowing them to be triggered and assigned.

use hatchet_sdk::{Context, EmptyModel, Hatchet, Register, anyhow, serde_json, tokio};

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();
    let hatchet = Hatchet::from_env().await.unwrap();

    async fn simple_task_func(
        _input: EmptyModel,
        ctx: Context,
    ) -> anyhow::Result<serde_json::Value> {
        ctx.log("Starting simple task").await?;
        Ok(serde_json::json!({"message": "success"}))
    }

    let simple_task = hatchet
        .task("simple-task", simple_task_func)
        .build()
        .unwrap();

    hatchet
        .worker("example-worker")
        .build()
        .unwrap()
        .add_task_or_workflow(&simple_task)
        .start()
        .await
        .unwrap();
}

§Declarative Workflow Design (DAGs)

Hatchet workflows are designed in a Directed Acyclic Graph (DAG) format, where each task is a node in the graph, and the dependencies between tasks are the edges.

§Building a DAG with Task Dependencies

The power of Hatchet’s workflow design comes from connecting tasks into a DAG structure. Tasks can specify dependencies (parents) which must complete successfully before the task can start.

§Running a Workflow

You can run workflows directly or enqueue them for asynchronous execution.

use hatchet_sdk::serde::{Deserialize, Serialize};
use hatchet_sdk::{Context, EmptyModel, Hatchet, Runnable, anyhow, serde_json, tokio};

#[derive(Serialize, Deserialize)]
#[serde(crate = "hatchet_sdk::serde")]
struct FirstTaskOutput {
    output: String,
}

#[derive(Serialize, Deserialize)]
#[serde(crate = "hatchet_sdk::serde")]
struct SecondTaskOutput {
    first_step_result: String,
    final_result: String,
}

#[derive(Serialize, Deserialize)]
#[serde(crate = "hatchet_sdk::serde")]
pub struct WorkflowOutput {
    first_task: FirstTaskOutput,
    second_task: SecondTaskOutput,
}

#[tokio::main]
async fn main() {
    dotenvy::dotenv().ok();

    let hatchet = Hatchet::from_env().await.unwrap();

    let first_task = hatchet
        .task(
            "first_task",
            async move |_input: EmptyModel, _ctx: Context| -> anyhow::Result<FirstTaskOutput> {
                Ok(FirstTaskOutput {
                    output: "Hello World".to_string(),
                })
            },
        )
        .build()
        .unwrap();

    let second_task = hatchet
        .task(
            "second_task",
            async move |_input: EmptyModel, ctx: Context| -> anyhow::Result<SecondTaskOutput> {
                let first_result = ctx.parent_output("first_task").await?;
                Ok(SecondTaskOutput {
                    first_step_result: first_result.get("output").unwrap().to_string(),
                    final_result: "Completed".to_string(),
                })
            },
        )
        .build()
        .unwrap()
        .add_parent(&first_task);

    let workflow = hatchet
        .workflow::<EmptyModel, WorkflowOutput>("dag-workflow")
        .build()
        .unwrap()
        .add_task(&first_task)
        .add_task(&second_task);

    // Run the workflow asynchronously, immediately returning the run ID
    let _run_id = workflow.run_no_wait(&EmptyModel, None).await.unwrap();
    // Run the workflow synchronously, waiting for a worker to complete it and return the result
    let result = workflow.run(&EmptyModel, None).await.unwrap();
    println!(
        "First task result: {}",
        serde_json::to_string(&result.first_task).unwrap()
    );
    println!(
        "Second task result: {}",
        serde_json::to_string(&result.second_task).unwrap()
    );
}

Re-exports§

pub use context::Context;
pub use error::HatchetError;
pub use runnables::Runnable;
pub use runnables::Task;
pub use runnables::TriggerWorkflowOptionsBuilder;
pub use runnables::Workflow;
pub use utils::EmptyModel;
pub use worker::Register;
pub use worker::Worker;
pub use chrono;
pub use serde;
pub use serde_json;
pub use tokio;

Modules§

anyhow
config
context
error
runnables
utils
worker

Structs§

CreateCronOpts
Options for creating a cron trigger via CronsClient::create.
CreateScheduleOpts
Options for creating a scheduled run via SchedulesClient::create.
CronOptions
Optional parameters for Task::cron and Workflow::cron.
CronTrigger
A cron workflow trigger returned by the Hatchet API.
CronTriggerList
Paginated list of cron triggers returned by CronsClient::list.
CronsClient
Client for managing cron workflow triggers. Accessed via Hatchet::crons.
Hatchet
The main client for interacting with the Hatchet API.
ListCronsOpts
Filter and pagination options for CronsClient::list.
ListSchedulesOpts
Filter and pagination options for SchedulesClient::list.
PaginationResponse
Pagination metadata returned by list endpoints.
RunsClient
The runs client is a client for interacting with task and workflow runs within Hatchet.
ScheduleOptions
Optional parameters for Task::schedule and Workflow::schedule.
ScheduledRun
A scheduled workflow run returned by the Hatchet API. The run will be enqueued at trigger_at.
ScheduledRunList
Paginated list of scheduled runs returned by SchedulesClient::list.
SchedulesClient
Client for managing scheduled (one-time) workflow runs. Accessed via Hatchet::schedules.