# pulse-client — Rust SDK for StreamFlow Pulse
Official Rust client for [Pulse](https://github.com/olsisoft/streamflow) — the AI Agent Platform. Async-first, **`reqwest` + `serde`** stack, **MSRV 1.82**.
```rust
use pulse_client::PulseClient;
#[tokio::main]
async fn main() -> Result<(), pulse_client::PulseError> {
let client = PulseClient::builder()
.base_url("http://localhost:9090")
.build()?;
client.auth().login("alice", "secret").await?;
for pipeline in client.pipelines().list().await? {
println!("{}", pipeline["name"]);
}
Ok(())
}
```
## Install
```toml
[dependencies]
pulse-client = "2.6.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```
Requires **Rust 1.82+** as a best-effort MSRV (declared in `Cargo.toml`). CI tests against **stable only** — the transitive dep graph (reqwest → hyper-util → tokio-rustls → base64ct → …) shifts its own floor frequently, so chasing an MSRV in CI produces flaky red builds for reasons unrelated to this code. If you hit a build error on a Rust older than stable, bump your toolchain.
## Why pulse-client (Rust)
- **Async-first** — every method returns `Future`. Drops naturally into tokio + axum + actix.
- **Three external deps** — `reqwest` (HTTP, the de facto standard) + `serde` + `serde_json`. No Hyper-direct fiddling, no custom transports.
- **`rustls` by default** — no system OpenSSL dance. Cross-compile works out of the box.
- **Sibling parity** — same surface + naming as the Python (`pulse-py`), JavaScript (`@olsisoft/pulse-client`), Java (`com.streamflow:pulse-client`), and Go (`github.com/olsisoft/pulse-go`) SDKs.
- **Cheap to clone** — `PulseClient: Clone`, the underlying `reqwest::Client` pools connections, the token sits behind `Arc<RwLock>`. Share a single instance across tasks.
- **Spec-aligned** — every method corresponds 1:1 to an endpoint in the [Pulse OpenAPI 3.1 spec](../streamflow-pulse/src/main/resources/openapi/openapi.yaml). Drift caught at PR time by the in-tree spec invariant tests (B-103).
## Quick start
```rust
use std::time::Duration;
use pulse_client::{PulseClient, PulseError};
#[tokio::main]
async fn main() -> Result<(), PulseError> {
let client = PulseClient::builder()
.base_url(std::env::var("PULSE_URL").unwrap())
.timeout(Duration::from_secs(10))
.build()?;
// Login — token cached on the client automatically
if let Err(err) = client
.auth()
.login(
&std::env::var("PULSE_USER").unwrap(),
&std::env::var("PULSE_PASSWORD").unwrap(),
)
.await
{
if err.is_auth_error() {
panic!("bad credentials");
}
return Err(err);
}
// List + inspect
for p in client.pipelines().list().await? {
println!("{} — {}", p["name"], p["status"]);
}
// Create from a template
let new_pipeline = client
.pipelines()
.create(&serde_json::json!({
"name": "my-fraud-detector",
"templateId": "fintech-fraud-detection-realtime",
"nodes": [
{"id": "src", "type": "source", "subType": "kafka-source"},
{"id": "agt", "type": "agent", "subType": "streaming"},
{"id": "snk", "type": "sink", "subType": "telegram"}
]
}))
.await?;
println!("created: {}", new_pipeline["id"]);
Ok(())
}
```
## Supported surfaces (v2.6.0)
| `client.auth()` | `login(user, pass)`, `refresh(refresh_token)`, `organizations()`, `switch_org(org_id)` | Auto-caches JWT after login / refresh / switch_org. |
| `client.pipelines()` | `list()`, `get(id)`, `create(definition)`, `delete(id)` | `definition` follows the CreatePipelineRequest schema. |
| `client.agents()` | `list()`, `get(id)` | Read-only — agents are owned by pipelines. |
| `client.templates()` | `list()` | The 223+ first-party templates. |
| `client.users()` | `list()` | Requires USERS_LIST permission (Owner / Platform Admin personas). |
| `client.version()` | top-level | Public — no JWT required. |
Every method returns `impl Future<Output = Result<Value, PulseError>>`. `Value` is the re-exported `serde_json::Value` — full document, no schema-bound DTOs (yet). Schema-bound types land in v3.0.
Full ~112-endpoint surface documented in Swagger UI at `<pulse-server>/api-docs`. Less-used methods land opportunistically as user-facing demand surfaces.
## Authentication
Three patterns:
```rust
// 1. Username + password (interactive / CLI tools)
let client = PulseClient::builder()
.base_url("http://localhost:9090")
.build()?;
client.auth().login("alice", "secret").await?;
// 2. Pre-minted JWT (CI / service accounts)
let client = PulseClient::builder()
.base_url("http://localhost:9090")
.token(std::env::var("PULSE_JWT").unwrap())
.build()?;
// 3. Hot token rotation (long-running daemons)
client.set_token(freshly_minted_token);
client.clear_token(); // log out
```
For long-running processes, persist `refreshToken` from `login()` and call `client.auth().refresh(&refresh_token)` before the JWT expires (default 1 h TTL).
## Error handling
```rust
use pulse_client::PulseError;
match client.pipelines().get("nope").await {
Ok(p) => println!("{p:?}"),
Err(PulseError::NotFound { .. }) => println!("doesn't exist — fine"),
Err(PulseError::RateLimit { retry_after_seconds, .. }) => {
let wait = retry_after_seconds.unwrap_or(60);
tokio::time::sleep(std::time::Duration::from_secs(wait as u64)).await;
// retry
}
Err(err) => {
eprintln!("Pulse call failed: {err}");
if let Some(code) = err.status_code() {
eprintln!("status={code}");
}
if let Some(body) = err.body() {
eprintln!("body={body}");
}
}
}
```
Convenience predicates: `err.is_auth_error()`, `is_not_found()`, `is_validation_error()`, `is_rate_limited()`. Every error carries `status_code()`, `path()`, `body()`.
## Custom reqwest::Client (proxies, mTLS, shared pools, tracing)
```rust
let shared = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.proxy(reqwest::Proxy::all("http://proxy.acme.com:3128")?)
// .add_root_certificate(...) // for mTLS / internal CAs
.build()?;
let client = PulseClient::builder()
.base_url("http://pulse.acme.com")
.http_client(shared)
.build()?;
```
## Development
```bash
git clone https://github.com/olsisoft/streamflow.git
cd streamflow/pulse-rs
cargo build
cargo test
cargo clippy --all-targets -- -D warnings
cargo doc --no-deps --open
```
CI runs the same on every push touching `pulse-rs/` — see `.github/workflows/pulse-rs.yaml`.
## Roadmap
- **v2.5.x** — current async API, 5 core resources, `version()`.
- **v2.6.x** — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
- **v3.0** — schema-bound DTOs (typed structs instead of `serde_json::Value`); event-stream consumer as a `Stream<Item = Event>` consuming `/api/pulse/events/stream` (SSE).
- **B-098 satellite** — once `olsisoft/pulse-rs` exists, this in-tree code lifts out and publishes to crates.io. `cargo add pulse-client` will switch to the satellite; in-tree continues to mirror for one release cycle.
Track progress in [`docs/STREAMFLOW-BACKLOG.md`](../docs/STREAMFLOW-BACKLOG.md) under item **B-098**.
## License
Apache 2.0 — same as the parent Pulse repository.