PyWatt SDK

Standardized SDK for building PyWatt modules in Rust.
Overview
This crate provides the core building blocks for creating PyWatt modules that integrate seamlessly with the PyWatt orchestrator. It handles:
- IPC Handshake: Standardized startup communication (
read_init, send_announce).
- Logging: Consistent logging to
stderr with secret redaction (init_module).
- Secret Management: Secure retrieval and rotation handling via the integrated
secret_client module (get_secret, subscribe_secret_rotations).
- Runtime IPC: Background task for processing orchestrator messages (
process_ipc_messages).
- Core Types: Re-exports essential types from the integrated
ipc_types module (OrchestratorInit, ModuleAnnounce, etc.).
- (Optional) Macros: Proc macros for simplifying module definition (requires
proc_macros feature).
- (Optional) JWT Auth: Middleware for Axum route protection (requires
jwt_auth feature).
Installation
Add this to your module's Cargo.toml:
[dependencies]
pywatt_sdk = "0.1.0"
tokio = { version = "1", features = ["full"] }
axum = "0.7"
tracing = "0.1"
Quickstart Example
Here's a minimal module using the SDK with Axum:
use pywatt_sdk::prelude::*;
use axum::{routing::get, Router, extract::State};
use tokio::net::{TcpListener, UnixListener};
use std::sync::Arc;
#[derive(Clone)]
struct MyModuleState {
message: String,
}
type SharedState = AppState<MyModuleState>;
async fn health_handler() -> &'static str {
"OK"
}
async fn custom_handler(State(state): State<SharedState>) -> String {
format!("Module {} says: {}", state.module_id(), state.user_state.message)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
init_module();
let init: OrchestratorInit = read_init().await?;
tracing::info!(?init, "Received orchestrator initialization");
let secret_client = get_module_secret_client(&init.orchestrator_api, &init.module_id).await?;
let initial_message = match get_secret(&secret_client, "INITIAL_MESSAGE").await {
Ok(secret) => secret.expose_secret().clone(),
Err(_) => {
tracing::warn!("INITIAL_MESSAGE secret not found, using default.");
"Default message".to_string()
}
};
let my_state = MyModuleState { message: initial_message };
let app_state = AppState::new(
init.module_id.clone(),
init.orchestrator_api.clone(),
secret_client.clone(),
my_state
);
let app_state_clone = app_state.clone(); let keys = vec!["INITIAL_MESSAGE".to_string()];
tokio::spawn(subscribe_secret_rotations(secret_client.clone(), keys, move |key, new_val| {
if key == "INITIAL_MESSAGE" {
let new_message = new_val.expose_secret().clone();
tracing::info!(%key, new_message = %new_message, "Secret rotated, state needs update (implementation detail)");
}
}));
let app = Router::new()
.route("/health", get(health_handler))
.route("/custom", get(custom_handler))
.with_state(app_state.clone());
let serve_future = match &init.listen {
ListenAddress::Tcp(addr) => {
tracing::info!(%addr, "Binding TCP listener");
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, app.into_make_service())
}
ListenAddress::Unix(path) => {
tracing::info!(path = %path.display(), "Binding Unix listener");
if path.exists() {
tokio::fs::remove_file(path).await?;
}
let listener = UnixListener::bind(path)?;
axum::serve(listener, app.into_make_service())
}
};
let announce = ModuleAnnounce {
listen: init.listen_address().to_string_lossy(), endpoints: vec![
AnnouncedEndpoint { path: "/health".into(), methods: vec!["GET".into()], auth: None },
AnnouncedEndpoint { path: "/custom".into(), methods: vec!["GET".into()], auth: None },
],
};
send_announce(&announce)?;
tracing::info!(?announce, "Sent announcement to orchestrator");
let ipc_handle = tokio::spawn(process_ipc_messages());
tracing::info!("Module server starting");
tokio::select! {
result = serve_future => {
if let Err(e) = result {
tracing::error!(error = %e, "Server exited with error");
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received Ctrl+C");
}
ipc_res = ipc_handle => {
match ipc_res {
Ok(_) => tracing::info!("IPC handler finished cleanly"),
Err(e) => tracing::error!(error = %e, "IPC handler finished with error"),
}
}
}
tracing::info!("Module shutting down gracefully");
Ok(())
}
## Core Functions & Types
(See the `prelude` module for the most common items)
- `init_module()`: Configures logging to stderr with secret redaction.
- `read_init() -> Result<OrchestratorInit, InitError>`: Reads the initial handshake message from stdin.
- `send_announce(announce: &ModuleAnnounce) -> Result<(), AnnounceError>`: Sends the module's announcement message to stdout.
- `process_ipc_messages()`: An async function to spawn that listens for runtime IPC messages (like secret rotations, shutdown) from stdin.
- `get_module_secret_client(...)`: Creates a `SecretClient` instance.
- `get_secret(...)`: Retrieves a secret, ensuring it's registered for redaction.
- `subscribe_secret_rotations(...)`: Spawns a task to handle secret rotation notifications.
- `AppState<T>`: A generic state container holding SDK state and optional user state `T`.
- Types from `ipc_types`: `OrchestratorInit`, `ModuleAnnounce`, `AnnouncedEndpoint`, `ListenAddress`, etc.
- Types from `secret_client`: `SecretClient`, `SecretError`, `RequestMode`.
## JWT Authentication (Optional Feature)
The SDK provides an optional `jwt_auth` feature to easily protect Axum routes with JWT Bearer token validation.
**Enable Feature**:
```toml
[dependencies]
# Ensure the jwt_auth feature is enabled
pywatt_sdk = { version = "0.1.0", features = ["jwt_auth"] }
axum = { version = "0.7", features = ["json"] }
# ... other deps
Usage:
-
Request the HMAC signing secret (e.g., JWT_HMAC_SECRET) in your module configuration (how secrets are requested depends on the orchestrator/module definition).
-
Fetch the secret using get_secret:
use pywatt_sdk::prelude::*;
use axum::Router;
async fn build_router_with_auth(state: AppState<()>) -> Result<Router, Box<dyn std::error::Error>> {
let jwt_secret_value = get_secret(state.secret_client(), "JWT_HMAC_SECRET").await?;
use pywatt_sdk::ext::RouterJwtExt;
let app = Router::new()
.route("/protected/resource", axum::routing::get(protected_handler))
.with_jwt(jwt_secret_value); Ok(app)
}
# async fn protected_handler() -> &'static str { "Protected" }
-
The middleware automatically:
- Expects an
Authorization: Bearer <token> header.
- Validates the token using the provided HMAC secret (HS256 algorithm).
- Returns
401 Unauthorized if the header is missing, the token is invalid, expired, or has the wrong signature.
- On success, inserts the decoded JWT claims (as
serde_json::Value) into the request extensions.
-
Access claims in your handler:
use axum::extract::Extension;
use serde_json::Value;
async fn protected_handler(Extension(claims): Extension<Value>) -> String {
let user_id = claims.get("sub").and_then(|v| v.as_str()).unwrap_or("anonymous");
format!("Hello, protected user {}!", user_id)
}
Contributing
Contributions are welcome! Please follow standard Rust practices and ensure tests pass.
License
This project is licensed under the MIT OR Apache-2.0 license. See LICENSE-MIT or LICENSE-APACHE for details.