pulse-client 2.6.0

Official Rust client for StreamFlow Pulse — AI Agent Platform
Documentation
# pulse-client — Rust SDK for StreamFlow Pulse

Official Rust client for [Pulse](https://github.com/olsisoft/streamflow) — the AI Agent Platform. Async-first, **`reqwest` + `serde`** stack, **MSRV 1.82**.

```rust
use pulse_client::PulseClient;

#[tokio::main]
async fn main() -> Result<(), pulse_client::PulseError> {
    let client = PulseClient::builder()
        .base_url("http://localhost:9090")
        .build()?;

    client.auth().login("alice", "secret").await?;

    for pipeline in client.pipelines().list().await? {
        println!("{}", pipeline["name"]);
    }
    Ok(())
}
```

## Install

```toml
[dependencies]
pulse-client = "2.6.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```

Requires **Rust 1.82+** as a best-effort MSRV (declared in `Cargo.toml`). CI tests against **stable only** — the transitive dep graph (reqwest → hyper-util → tokio-rustls → base64ct → …) shifts its own floor frequently, so chasing an MSRV in CI produces flaky red builds for reasons unrelated to this code. If you hit a build error on a Rust older than stable, bump your toolchain.

## Why pulse-client (Rust)

- **Async-first** — every method returns `Future`. Drops naturally into tokio + axum + actix.
- **Three external deps**`reqwest` (HTTP, the de facto standard) + `serde` + `serde_json`. No Hyper-direct fiddling, no custom transports.
- **`rustls` by default** — no system OpenSSL dance. Cross-compile works out of the box.
- **Sibling parity** — same surface + naming as the Python (`pulse-py`), JavaScript (`@olsisoft/pulse-client`), Java (`com.streamflow:pulse-client`), and Go (`github.com/olsisoft/pulse-go`) SDKs.
- **Cheap to clone**`PulseClient: Clone`, the underlying `reqwest::Client` pools connections, the token sits behind `Arc<RwLock>`. Share a single instance across tasks.
- **Spec-aligned** — every method corresponds 1:1 to an endpoint in the [Pulse OpenAPI 3.1 spec]../streamflow-pulse/src/main/resources/openapi/openapi.yaml. Drift caught at PR time by the in-tree spec invariant tests (B-103).

## Quick start

```rust
use std::time::Duration;
use pulse_client::{PulseClient, PulseError};

#[tokio::main]
async fn main() -> Result<(), PulseError> {
    let client = PulseClient::builder()
        .base_url(std::env::var("PULSE_URL").unwrap())
        .timeout(Duration::from_secs(10))
        .build()?;

    // Login — token cached on the client automatically
    if let Err(err) = client
        .auth()
        .login(
            &std::env::var("PULSE_USER").unwrap(),
            &std::env::var("PULSE_PASSWORD").unwrap(),
        )
        .await
    {
        if err.is_auth_error() {
            panic!("bad credentials");
        }
        return Err(err);
    }

    // List + inspect
    for p in client.pipelines().list().await? {
        println!("{} — {}", p["name"], p["status"]);
    }

    // Create from a template
    let new_pipeline = client
        .pipelines()
        .create(&serde_json::json!({
            "name": "my-fraud-detector",
            "templateId": "fintech-fraud-detection-realtime",
            "nodes": [
                {"id": "src",  "type": "source", "subType": "kafka-source"},
                {"id": "agt",  "type": "agent",  "subType": "streaming"},
                {"id": "snk",  "type": "sink",   "subType": "telegram"}
            ]
        }))
        .await?;
    println!("created: {}", new_pipeline["id"]);
    Ok(())
}
```

## Supported surfaces (v2.6.0)

| Resource | Methods | Notes |
|---|---|---|
| `client.auth()` | `login(user, pass)`, `refresh(refresh_token)`, `organizations()`, `switch_org(org_id)` | Auto-caches JWT after login / refresh / switch_org. |
| `client.pipelines()` | `list()`, `get(id)`, `create(definition)`, `delete(id)` | `definition` follows the CreatePipelineRequest schema. |
| `client.agents()` | `list()`, `get(id)` | Read-only — agents are owned by pipelines. |
| `client.templates()` | `list()` | The 223+ first-party templates. |
| `client.users()` | `list()` | Requires USERS_LIST permission (Owner / Platform Admin personas). |
| `client.version()` | top-level | Public — no JWT required. |

Every method returns `impl Future<Output = Result<Value, PulseError>>`. `Value` is the re-exported `serde_json::Value` — full document, no schema-bound DTOs (yet). Schema-bound types land in v3.0.

Full ~112-endpoint surface documented in Swagger UI at `<pulse-server>/api-docs`. Less-used methods land opportunistically as user-facing demand surfaces.

## Authentication

Three patterns:

```rust
// 1. Username + password (interactive / CLI tools)
let client = PulseClient::builder()
    .base_url("http://localhost:9090")
    .build()?;
client.auth().login("alice", "secret").await?;

// 2. Pre-minted JWT (CI / service accounts)
let client = PulseClient::builder()
    .base_url("http://localhost:9090")
    .token(std::env::var("PULSE_JWT").unwrap())
    .build()?;

// 3. Hot token rotation (long-running daemons)
client.set_token(freshly_minted_token);
client.clear_token();  // log out
```

For long-running processes, persist `refreshToken` from `login()` and call `client.auth().refresh(&refresh_token)` before the JWT expires (default 1 h TTL).

## Error handling

```rust
use pulse_client::PulseError;

match client.pipelines().get("nope").await {
    Ok(p) => println!("{p:?}"),
    Err(PulseError::NotFound { .. }) => println!("doesn't exist — fine"),
    Err(PulseError::RateLimit { retry_after_seconds, .. }) => {
        let wait = retry_after_seconds.unwrap_or(60);
        tokio::time::sleep(std::time::Duration::from_secs(wait as u64)).await;
        // retry
    }
    Err(err) => {
        eprintln!("Pulse call failed: {err}");
        if let Some(code) = err.status_code() {
            eprintln!("status={code}");
        }
        if let Some(body) = err.body() {
            eprintln!("body={body}");
        }
    }
}
```

Convenience predicates: `err.is_auth_error()`, `is_not_found()`, `is_validation_error()`, `is_rate_limited()`. Every error carries `status_code()`, `path()`, `body()`.

## Custom reqwest::Client (proxies, mTLS, shared pools, tracing)

```rust
let shared = reqwest::Client::builder()
    .timeout(std::time::Duration::from_secs(5))
    .proxy(reqwest::Proxy::all("http://proxy.acme.com:3128")?)
    // .add_root_certificate(...)   // for mTLS / internal CAs
    .build()?;

let client = PulseClient::builder()
    .base_url("http://pulse.acme.com")
    .http_client(shared)
    .build()?;
```

## Development

```bash
git clone https://github.com/olsisoft/streamflow.git
cd streamflow/pulse-rs

cargo build
cargo test
cargo clippy --all-targets -- -D warnings
cargo doc --no-deps --open
```

CI runs the same on every push touching `pulse-rs/` — see `.github/workflows/pulse-rs.yaml`.

## Roadmap

- **v2.5.x** — current async API, 5 core resources, `version()`.
- **v2.6.x** — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
- **v3.0** — schema-bound DTOs (typed structs instead of `serde_json::Value`); event-stream consumer as a `Stream<Item = Event>` consuming `/api/pulse/events/stream` (SSE).
- **B-098 satellite** — once `olsisoft/pulse-rs` exists, this in-tree code lifts out and publishes to crates.io. `cargo add pulse-client` will switch to the satellite; in-tree continues to mirror for one release cycle.

Track progress in [`docs/STREAMFLOW-BACKLOG.md`](../docs/STREAMFLOW-BACKLOG.md) under item **B-098**.

## License

Apache 2.0 — same as the parent Pulse repository.