pulse-client 2.6.0

Official Rust client for StreamFlow Pulse — AI Agent Platform
Documentation

pulse-client — Rust SDK for StreamFlow Pulse

Official Rust client for Pulse — the AI Agent Platform. Async-first, reqwest + serde stack, MSRV 1.82.

use pulse_client::PulseClient;

#[tokio::main]
async fn main() -> Result<(), pulse_client::PulseError> {
    let client = PulseClient::builder()
        .base_url("http://localhost:9090")
        .build()?;

    client.auth().login("alice", "secret").await?;

    for pipeline in client.pipelines().list().await? {
        println!("{}", pipeline["name"]);
    }
    Ok(())
}

Install

[dependencies]
pulse-client = "2.6.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Requires Rust 1.82+ as a best-effort MSRV (declared in Cargo.toml). CI tests against stable only — the transitive dep graph (reqwest → hyper-util → tokio-rustls → base64ct → …) shifts its own floor frequently, so chasing an MSRV in CI produces flaky red builds for reasons unrelated to this code. If you hit a build error on a Rust older than stable, bump your toolchain.

Why pulse-client (Rust)

  • Async-first — every method returns Future. Drops naturally into tokio + axum + actix.
  • Three external depsreqwest (HTTP, the de facto standard) + serde + serde_json. No Hyper-direct fiddling, no custom transports.
  • rustls by default — no system OpenSSL dance. Cross-compile works out of the box.
  • Sibling parity — same surface + naming as the Python (pulse-py), JavaScript (@olsisoft/pulse-client), Java (com.streamflow:pulse-client), and Go (github.com/olsisoft/pulse-go) SDKs.
  • Cheap to clonePulseClient: Clone, the underlying reqwest::Client pools connections, the token sits behind Arc<RwLock>. Share a single instance across tasks.
  • Spec-aligned — every method corresponds 1:1 to an endpoint in the Pulse OpenAPI 3.1 spec. Drift caught at PR time by the in-tree spec invariant tests (B-103).

Quick start

use std::time::Duration;
use pulse_client::{PulseClient, PulseError};

#[tokio::main]
async fn main() -> Result<(), PulseError> {
    let client = PulseClient::builder()
        .base_url(std::env::var("PULSE_URL").unwrap())
        .timeout(Duration::from_secs(10))
        .build()?;

    // Login — token cached on the client automatically
    if let Err(err) = client
        .auth()
        .login(
            &std::env::var("PULSE_USER").unwrap(),
            &std::env::var("PULSE_PASSWORD").unwrap(),
        )
        .await
    {
        if err.is_auth_error() {
            panic!("bad credentials");
        }
        return Err(err);
    }

    // List + inspect
    for p in client.pipelines().list().await? {
        println!("{}{}", p["name"], p["status"]);
    }

    // Create from a template
    let new_pipeline = client
        .pipelines()
        .create(&serde_json::json!({
            "name": "my-fraud-detector",
            "templateId": "fintech-fraud-detection-realtime",
            "nodes": [
                {"id": "src",  "type": "source", "subType": "kafka-source"},
                {"id": "agt",  "type": "agent",  "subType": "streaming"},
                {"id": "snk",  "type": "sink",   "subType": "telegram"}
            ]
        }))
        .await?;
    println!("created: {}", new_pipeline["id"]);
    Ok(())
}

Supported surfaces (v2.6.0)

Resource Methods Notes
client.auth() login(user, pass), refresh(refresh_token), organizations(), switch_org(org_id) Auto-caches JWT after login / refresh / switch_org.
client.pipelines() list(), get(id), create(definition), delete(id) definition follows the CreatePipelineRequest schema.
client.agents() list(), get(id) Read-only — agents are owned by pipelines.
client.templates() list() The 223+ first-party templates.
client.users() list() Requires USERS_LIST permission (Owner / Platform Admin personas).
client.version() top-level Public — no JWT required.

Every method returns impl Future<Output = Result<Value, PulseError>>. Value is the re-exported serde_json::Value — full document, no schema-bound DTOs (yet). Schema-bound types land in v3.0.

Full ~112-endpoint surface documented in Swagger UI at <pulse-server>/api-docs. Less-used methods land opportunistically as user-facing demand surfaces.

Authentication

Three patterns:

// 1. Username + password (interactive / CLI tools)
let client = PulseClient::builder()
    .base_url("http://localhost:9090")
    .build()?;
client.auth().login("alice", "secret").await?;

// 2. Pre-minted JWT (CI / service accounts)
let client = PulseClient::builder()
    .base_url("http://localhost:9090")
    .token(std::env::var("PULSE_JWT").unwrap())
    .build()?;

// 3. Hot token rotation (long-running daemons)
client.set_token(freshly_minted_token);
client.clear_token();  // log out

For long-running processes, persist refreshToken from login() and call client.auth().refresh(&refresh_token) before the JWT expires (default 1 h TTL).

Error handling

use pulse_client::PulseError;

match client.pipelines().get("nope").await {
    Ok(p) => println!("{p:?}"),
    Err(PulseError::NotFound { .. }) => println!("doesn't exist — fine"),
    Err(PulseError::RateLimit { retry_after_seconds, .. }) => {
        let wait = retry_after_seconds.unwrap_or(60);
        tokio::time::sleep(std::time::Duration::from_secs(wait as u64)).await;
        // retry
    }
    Err(err) => {
        eprintln!("Pulse call failed: {err}");
        if let Some(code) = err.status_code() {
            eprintln!("status={code}");
        }
        if let Some(body) = err.body() {
            eprintln!("body={body}");
        }
    }
}

Convenience predicates: err.is_auth_error(), is_not_found(), is_validation_error(), is_rate_limited(). Every error carries status_code(), path(), body().

Custom reqwest::Client (proxies, mTLS, shared pools, tracing)

let shared = reqwest::Client::builder()
    .timeout(std::time::Duration::from_secs(5))
    .proxy(reqwest::Proxy::all("http://proxy.acme.com:3128")?)
    // .add_root_certificate(...)   // for mTLS / internal CAs
    .build()?;

let client = PulseClient::builder()
    .base_url("http://pulse.acme.com")
    .http_client(shared)
    .build()?;

Development

git clone https://github.com/olsisoft/streamflow.git
cd streamflow/pulse-rs

cargo build
cargo test
cargo clippy --all-targets -- -D warnings
cargo doc --no-deps --open

CI runs the same on every push touching pulse-rs/ — see .github/workflows/pulse-rs.yaml.

Roadmap

  • v2.5.x — current async API, 5 core resources, version().
  • v2.6.x — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
  • v3.0 — schema-bound DTOs (typed structs instead of serde_json::Value); event-stream consumer as a Stream<Item = Event> consuming /api/pulse/events/stream (SSE).
  • B-098 satellite — once olsisoft/pulse-rs exists, this in-tree code lifts out and publishes to crates.io. cargo add pulse-client will switch to the satellite; in-tree continues to mirror for one release cycle.

Track progress in docs/STREAMFLOW-BACKLOG.md under item B-098.

License

Apache 2.0 — same as the parent Pulse repository.