Zeal Rust SDK
High-performance Rust SDK for the Zeal Integration Protocol (ZIP), enabling efficient third-party workflow runtime integration with the Zeal workflow editor.
Prerequisites
⚠️ Important: A running Zeal server instance is required for the SDK to function. The SDK communicates with the Zeal server via REST APIs and WebSocket connections.
Starting the Zeal Server
git clone https://github.com/offbit-ai/zeal.git
cd zeal
npm install
npm run dev
./start-dev.sh
The Zeal server will be available at http://localhost:3000 by default.
For detailed setup instructions, deployment options, and configuration, please refer to the Zeal repository.
Features
- Zero-copy JSON parsing with
serde_json and simd-json
- Async/await support with
tokio and futures
- WebSocket real-time communication with
tokio-tungstenite
- HTTP/2 client with
reqwest and connection pooling
- Structured logging with
tracing and OpenTelemetry support
- Memory-efficient streaming for large payloads
- Built-in retry logic with exponential backoff
- Compile-time safety with strong typing and error handling
- Observable streams with
futures-util and custom stream combinators
- Thread-safe concurrent operations with
Arc and Mutex
Installation
Add to your Cargo.toml:
[dependencies]
zeal-sdk = "1.0.0"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Quick Start
use zeal_sdk::{ZealClient, ClientConfig, NodeTemplate};
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = ZealClient::new(ClientConfig {
base_url: "http://localhost:3000".to_string(),
..Default::default()
})?;
let templates = vec![
NodeTemplate {
id: "data-processor".to_string(),
type_name: "processor".to_string(),
title: "Data Processor".to_string(),
category: "Processing".to_string(),
description: "Processes data efficiently".to_string(),
}
];
client.templates().register(
"my-runtime",
templates,
None
).await?;
let subscription = client.create_subscription(SubscriptionOptions {
port: Some(3001),
namespace: Some("my-runtime".to_string()),
events: vec!["workflow.*".to_string(), "node.*".to_string()],
..Default::default()
})?;
subscription.on_event(|event| async move {
println!("Received event: {} - {}", event.event_type, event.data);
}).await;
subscription.start().await?;
Ok(())
}
Core APIs
Templates API
Register and manage node templates:
use zeal_sdk::templates::*;
let result = client.templates().register(
"my-integration",
vec![template],
Some("http://my-server.com/webhook".to_string())
).await?;
let templates = client.templates().list("my-integration").await?;
let template = client.templates().get("template-id").await?;
Orchestrator API
Programmatically create and modify workflows:
use zeal_sdk::orchestrator::*;
let workflow = client.orchestrator().create_workflow(CreateWorkflowRequest {
name: "My Workflow".to_string(),
description: Some("Created via Rust SDK".to_string()),
metadata: None,
}).await?;
let node = client.orchestrator().add_node(AddNodeRequest {
workflow_id: workflow.workflow_id,
template_id: "template-id".to_string(),
position: Position { x: 100.0, y: 100.0 },
property_values: Some(serde_json::json!({
"param1": "value1"
})),
}).await?;
client.orchestrator().connect_nodes(ConnectNodesRequest {
workflow_id: workflow.workflow_id,
source: NodePort {
node_id: "node1".to_string(),
port_id: "output".to_string(),
},
target: NodePort {
node_id: "node2".to_string(),
port_id: "input".to_string(),
},
}).await?;
Traces API
Submit execution trace data with high performance:
use zeal_sdk::traces::*;
let session = client.traces().create_session(CreateTraceSessionRequest {
workflow_id: "workflow-id".to_string(),
execution_id: "exec-123".to_string(),
metadata: Some(TraceMetadata {
trigger: Some("manual".to_string()),
environment: Some("production".to_string()),
tags: vec!["batch-job".to_string()],
}),
}).await?;
let events = vec![
TraceEvent {
timestamp: chrono::Utc::now().timestamp_millis(),
node_id: "node-id".to_string(),
event_type: TraceEventType::Output,
data: TraceData {
size: 1024,
data_type: "application/json".to_string(),
preview: Some(serde_json::json!({"processed": 1000})),
full_data: None,
},
duration: Some(std::time::Duration::from_millis(150)),
..Default::default()
}
];
client.traces().submit_events(&session.session_id, events).await?;
client.traces().complete_session(
&session.session_id,
TraceStatus::Completed
).await?;
Events API
Real-time bidirectional communication:
use zeal_sdk::events::*;
use futures_util::StreamExt;
let mut event_stream = client.events().connect("workflow-id").await?;
tokio::spawn(async move {
while let Some(event) = event_stream.next().await {
match event {
Ok(ZealEvent::NodeExecuting { node_id, .. }) => {
println!("Node {} is executing", node_id);
}
Ok(ZealEvent::NodeCompleted { node_id, result, .. }) => {
println!("Node {} completed: {:?}", node_id, result);
}
Err(e) => eprintln!("WebSocket error: {}", e),
}
}
});
client.events().send_runtime_event(RuntimeEvent {
event_type: RuntimeEventType::NodeExecutionStart,
workflow_id: "workflow-id".to_string(),
data: serde_json::json!({
"nodeId": "node-123",
"timestamp": chrono::Utc::now().timestamp_millis()
}),
}).await?;
Observable Streams
Process events with powerful stream combinators:
use zeal_sdk::observables::*;
use futures_util::{StreamExt, TryStreamExt};
let subscription = client.create_subscription(SubscriptionOptions::default())?;
let stream = subscription.as_observable().await?;
let error_stream = stream
.filter_map(|event| async move {
if event.event_type.contains("error") {
Some(ErrorEvent {
id: event.id,
error: event.data.get("error").cloned()?,
timestamp: event.timestamp,
})
} else {
None
}
})
.take(100) .collect::<Vec<_>>()
.await;
let node_events = stream
.filter(|event| async move {
matches!(event.event_type.as_str(), "node.executed" | "node.failed")
})
.for_each(|event| async {
println!("Node event: {:#?}", event);
})
.await;
use futures_util::stream;
let processed_stream = stream
.buffer_unordered(10) .filter_map(|result| async move {
match result {
Ok(event) => Some(process_event(event).await),
Err(e) => {
eprintln!("Stream error: {}", e);
None
}
}
})
.take_while(|processed| {
let should_continue = processed.is_ok();
async move { should_continue }
});
Advanced Features
Connection Pooling and Performance
use zeal_sdk::{ClientConfig, PerformanceConfig};
let client = ZealClient::new(ClientConfig {
base_url: "http://localhost:3000".to_string(),
performance: PerformanceConfig {
max_connections_per_host: 50,
connection_timeout: std::time::Duration::from_secs(10),
request_timeout: std::time::Duration::from_secs(30),
tcp_keepalive: Some(std::time::Duration::from_secs(60)),
http2_prior_knowledge: true,
..Default::default()
},
..Default::default()
})?;
Batch Operations
use zeal_sdk::traces::TraceBatch;
let mut batch = TraceBatch::new(1000);
for i in 0..10000 {
batch.add_event(TraceEvent {
})?;
if let Some(events) = batch.try_flush() {
client.traces().submit_events(&session_id, events).await?;
}
}
if let Some(events) = batch.flush() {
client.traces().submit_events(&session_id, events).await?;
}
Structured Logging and Observability
use tracing::{info, error, instrument};
use zeal_sdk::telemetry::ZealTelemetry;
ZealTelemetry::init()?;
#[instrument(skip(client))]
async fn process_workflow(
client: &ZealClient,
workflow_id: &str
) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting workflow processing: {}", workflow_id);
let session = client.traces().create_session(CreateTraceSessionRequest {
workflow_id: workflow_id.to_string(),
execution_id: uuid::Uuid::new_v4().to_string(),
metadata: None,
}).await?;
info!("Created trace session: {}", session.session_id);
Ok(())
}
Custom Error Types
use zeal_sdk::errors::*;
match client.templates().get("invalid-id").await {
Ok(template) => println!("Template: {:#?}", template),
Err(ZealError::NotFound { resource, id }) => {
eprintln!("Template '{}' not found", id);
}
Err(ZealError::NetworkError { source, retryable }) => {
eprintln!("Network error: {} (retryable: {})", source, retryable);
if retryable {
}
}
Err(ZealError::ValidationError { field, message }) => {
eprintln!("Validation error in '{}': {}", field, message);
}
Err(e) => eprintln!("Other error: {}", e),
}
Performance Benchmarks
The Rust SDK is designed for high-performance applications:
- Memory usage: ~2MB baseline, efficient streaming for large payloads
- CPU efficiency: Zero-copy JSON parsing, async I/O
- Throughput: 50,000+ events/second on modern hardware
- Latency: Sub-millisecond event processing overhead
- Concurrent connections: 1000+ WebSocket connections per instance
Examples
See the examples directory for complete working examples:
Platform Support
- Linux (x86_64, aarch64)
- macOS (x86_64, Apple Silicon)
- Windows (x86_64)
License
Apache-2.0