runtara-sdk

High-level SDK for building durable workflows with Runtara. Provides checkpointing, signal handling, and crash recovery for long-running processes.
Overview
The Runtara SDK enables building crash-resilient workflows by providing:
- Checkpointing: Automatically save workflow state to PostgreSQL for crash recovery
- Signal Handling: Respond to cancel, pause, and resume signals
- Durable Sleep: Long sleeps persist across process restarts
- Progress Tracking: Report workflow progress to the execution engine
#[durable] Macro: Transparent durability for async functions
Installation
Add to your Cargo.toml:
[dependencies]
runtara-sdk = "1.0"
Usage
Basic Workflow
use runtara_sdk::RuntaraSdk;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sdk = RuntaraSdk::localhost("instance-id", "tenant-id")?;
sdk.connect().await?;
sdk.register(None).await?;
for i in 0..10 {
let state = serde_json::to_vec(&i)?;
let result = sdk.checkpoint(&format!("step-{}", i), &state).await?;
if result.should_cancel() {
return Err("Cancelled".into());
}
if result.should_pause() {
sdk.suspended().await?;
return Ok(());
}
if result.existing_state().is_some() {
continue;
}
println!("Processing step {}", i);
}
sdk.completed(b"done").await?;
Ok(())
}
Checkpoint Pattern
Checkpoints handle both saving state and resuming from crashes:
use runtara_sdk::RuntaraSdk;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct MyState {
processed_items: Vec<String>,
current_index: usize,
}
async fn process_items(sdk: &mut RuntaraSdk, items: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
let mut state = MyState {
processed_items: vec![],
current_index: 0,
};
for (i, item) in items.iter().enumerate() {
let checkpoint_data = serde_json::to_vec(&state)?;
let result = sdk.checkpoint(&format!("item-{}", i), &checkpoint_data).await?;
if let Some(existing) = result.existing_state() {
state = serde_json::from_slice(existing)?;
continue;
}
state.processed_items.push(item.clone());
state.current_index = i + 1;
}
Ok(())
}
Using the #[durable] Macro
The #[durable] macro provides automatic checkpoint-based caching:
use runtara_sdk_macros::durable;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct Order {
id: String,
total: f64,
}
#[durable]
pub async fn fetch_order(order_id: String) -> Result<Order, Box<dyn std::error::Error>> {
let order = db_fetch_order(&order_id).await?;
Ok(order)
}
Signal Handling
Handle external control signals (cancel, pause, resume):
use runtara_sdk::RuntaraSdk;
async fn long_running_task(sdk: &mut RuntaraSdk) -> Result<(), Box<dyn std::error::Error>> {
for i in 0..1000 {
if sdk.check_cancelled().await? {
println!("Workflow cancelled");
return Ok(());
}
if let Some(signal) = sdk.poll_signal().await? {
match signal.signal_type.as_str() {
"cancel" => return Ok(()),
"pause" => {
sdk.acknowledge_signal(&signal.signal_id).await?;
sdk.suspended().await?;
return Ok(());
}
_ => {}
}
}
}
Ok(())
}
Durable Sleep
Sleep that persists across restarts:
use runtara_sdk::RuntaraSdk;
use std::time::Duration;
async fn scheduled_task(sdk: &mut RuntaraSdk) -> Result<(), Box<dyn std::error::Error>> {
sdk.sleep(Duration::from_secs(5)).await?;
sdk.sleep(Duration::from_secs(3600)).await?;
println!("Woke up after 1 hour!");
Ok(())
}
Environment Variables
| Variable |
Required |
Default |
Description |
RUNTARA_INSTANCE_ID |
Yes |
- |
Unique instance identifier |
RUNTARA_TENANT_ID |
Yes |
- |
Tenant identifier |
RUNTARA_SERVER_ADDR |
No |
127.0.0.1:8001 |
Server address |
RUNTARA_SKIP_CERT_VERIFICATION |
No |
false |
Skip TLS verification |
Related Crates
License
This project is licensed under AGPL-3.0-or-later.