lb-sparkplugb-rs 0.1.1

Eclipse Sparkplug B 3.0.0 protocol library — payload codec, topic namespace, sequence/bdSeq, and (phased) edge & host roles, in Rust.
Documentation

lb-sparkplugb-rs (sparkplug_b)

A standalone, spec-driven Rust implementation of the Eclipse Sparkplug B 3.0.0 specification. It covers the whole protocol stack:

  • the wire payload model and a hand-written proto2 codec (no protoc/prost-build);
  • the topic namespace, message types, and validated identifiers;
  • the sequence (seq) / bdSeq machinery, with pluggable bdSeq persistence;
  • the alias registry and optional compression envelope;
  • async Edge Node + Device and Host Application / Primary Host engines;
  • a rumqttc MQTT v5 transport with TLS / mTLS, plus an embedded-broker end-to-end test.

The crate (package lb-sparkplugb-rs, library sparkplug_b) has no dependency on any SCADA framework. The protobuf codec is hand-written, so the foundation's only runtime dependencies are bytes and thiserror; everything else (compression, the MQTT client, TLS, the embedded broker) is behind an opt-in feature flag.

It is a library, not a binary — there is no cargo run target. Use cargo test, cargo build, or cargo run --example encode_nbirth.

Status

Layer Status
Payload model + 35 DataTypes (scalars, arrays, DataSet, Template, properties) Implemented
Proto2 codec (encode/decode, packed arrays, null, strip-datatypes) Implemented
Topic namespace + message types + validated IDs Implemented
Sequence (seq) + bdSeq + persistence (in-memory / file) Implemented
Alias registry (name ↔ alias ↔ datatype) Implemented
STATE payload (JSON) Implemented
Compression envelope (compression feature, DEFLATE/GZIP) Implemented
Edge Node + Device lifecycle (async engine) Implemented
Host Application + Primary Host (async engine) Implemented
rumqttc transport (transport-rumqttc) + embedded-broker e2e (sim) Implemented
TLS / mTLS (tls feature) + mTLS e2e Implemented
Multi-server HA + sequence reorder buffer Roadmap

See ../docs/plan-lb-sparkplugb-rs-sparkplug-b.md for the full plan, ../docs/sparkplug-b-normative-statements.md for the conformance catalog, and ../docs/sparkplug-b-tahu-design-notes.md for the comparison against Eclipse Tahu.

Cargo features

All features are off by default — the bare crate is the pure protocol core.

Feature Enables Pulls in
compression compress::{compress, decompress} DEFLATE/GZIP envelope flate2
transport-rumqttc RumqttcTransport, an MQTT v5 MqttTransport impl rumqttc =0.24, tokio (time)
tls TLS + mTLS for the rumqttc transport (rustls 0.22, ring-backed — no CryptoProvider install needed) rumqttc/use-rustls
sim Embedded rumqttd broker (incl. a TLS listener + client-cert verification) for the e2e tests rumqttd =0.20, multi-thread tokio

The rumqttc =0.24.0 / rustls 0.22 pin is load-bearing for the "no CryptoProvider install" property — see the note in Cargo.toml before bumping it.

Architecture

The crate is layered so the protocol core never depends on the engines, and the engines never depend on a concrete MQTT client:

codec  +  model/value/datatype   ← pure: encode/decode, total, panic-free
topic  +  state  +  sequence      ← namespace, STATE JSON, seq/bdSeq
alias                             ← name ↔ alias ↔ datatype bindings
        │
transport (MqttTransport trait)   ← RumqttcTransport behind a feature
        │
edge (EdgeNode)  /  host (HostApplication)   ← async engines, generic over the transport

Both engines spawn no tasks and use no runtime timers — the caller drives a recv_and_handle loop — so they are deterministic and runtime-agnostic. Tests use an in-memory transport; the rumqttc transport supplies the real network loop.

Module map

Module What it provides
model Payload, Metric, MetaData, PropertySet, PropertySetList, DataSet (validated), Template, Parameter
value MetricValue, DataSetValue, PropertyValue, ParameterValue — tagged enums fusing datatype + value
datatype DataType (codes 0..=34) with is_basic / is_array / array_element_width classifiers
codec encode, decode, EncodeOptions (birth() keeps datatypes, data() strips them)
topic SparkplugTopic, MessageType, validated GroupId / EdgeNodeId / DeviceId
sequence Seq, BdSeq, BdSeqStore trait, InMemoryBdSeqStore, FileBdSeqStore
alias AliasRegistry, MetricKey
state StatePayload (JSON {"online":…,"timestamp":…})
compress (feature) Compression, compress, decompress
transport MqttTransport trait, ConnectOptions, OutboundMessage, IncomingMessage, Qos, TlsConfig, RumqttcTransport (feature)
edge EdgeNode, EdgeNodeConfig, EdgeState, EdgeEvent, DataSource
host HostApplication, HostConfig, HostEvent
error SparkplugError (non-exhaustive), Result

Crate-root constants: SPARKPLUG_B_NAMESPACE (spBv1.0), NODE_CONTROL_REBIRTH, BDSEQ_METRIC_NAME, COMPRESSED_PAYLOAD_UUID.

Examples

1. Encode / decode a payload

use sparkplug_b::{decode, encode, EncodeOptions, Metric, MetricValue, Payload};

let payload = Payload::new()
    .with_timestamp(1_700_000_000_000)
    .with_seq(0) // NBIRTH carries seq = 0
    .with_metric(Metric::new("Temperature", MetricValue::Double(21.5)))
    .with_metric(Metric::new("Online", MetricValue::Boolean(true)));

let bytes = encode(&payload, EncodeOptions::birth()); // datatypes included
let decoded = decode(&bytes, None).unwrap();          // BIRTH carries its own datatypes
assert_eq!(decoded, payload);

For DATA/CMD messages use EncodeOptions::data() (datatypes stripped) and pass an AliasRegistry built from the birth to decode so stripped datatypes can be recovered.

cargo run --example encode_nbirth

2. Parse a topic

use sparkplug_b::{MessageType, SparkplugTopic};

let topic = SparkplugTopic::parse("spBv1.0/Plant1/NBIRTH/Edge1").unwrap();
assert_eq!(topic.message_type(), MessageType::NBirth);
assert_eq!(topic.to_string(), "spBv1.0/Plant1/NBIRTH/Edge1");

3. Drive an Edge Node (transport-rumqttc feature)

The EdgeNode engine registers the NDEATH as the MQTT will, connects, publishes NBIRTH (seq = 0, the bdSeq and Node Control/Rebirth metrics) and each device's DBIRTH, then reports data by exception. You supply the birth metrics via a DataSource and drive recv_and_handle (which answers NCMD rebirth requests and tracks the primary host).

use sparkplug_b::{
    EdgeNode, EdgeNodeConfig, DataSource, Metric, MetricValue,
    InMemoryBdSeqStore, RumqttcTransport,
};

struct Tags;
impl DataSource for Tags {
    fn node_birth_metrics(&self) -> Vec<Metric> {
        vec![Metric::new("Temperature", MetricValue::Double(20.0))]
    }
    fn device_birth_metrics(&self, _device: &str) -> Vec<Metric> {
        vec![Metric::new("Pressure", MetricValue::Float(1.0))]
    }
}

async fn run() -> sparkplug_b::Result<()> {
    let config = EdgeNodeConfig::new("Plant1", "Edge1", &["Press01"])?;
    let mut node = EdgeNode::new(config, RumqttcTransport::new(), InMemoryBdSeqStore::new(0));

    node.connect(&Tags).await?;                                  // will + NBIRTH + DBIRTH
    node.publish_node_data(vec![
        Metric::new("Temperature", MetricValue::Double(21.5)),
    ]).await?;                                                   // NDATA by exception

    while let Some(event) = node.recv_and_handle(&Tags).await? {
        // handle EdgeEvent::Rebirthed / NodeCommand / DeviceCommand / …
    }
    node.disconnect().await?;                                    // graceful NDEATH
    Ok(())
}

EdgeNodeConfig also exposes use_aliases (assign aliases on births, send alias-only DATA), primary_host_id (gate births on a primary host's STATE), rebirth_debounce, keep-alive, and tls.

4. Run a Host Application / Primary Host

The HostApplication engine publishes a retained online STATE (sharing the offline-STATE will's timestamp), subscribes the data namespace, then consumes Edge traffic: it binds aliases on birth, validates each Edge Node's sequence number, resolves alias-only DATA back to names, gates NDEATH on the bdSeq, and requests a (debounced) rebirth on a sequence gap or data-before-birth.

use sparkplug_b::{HostApplication, HostConfig, HostEvent, RumqttcTransport};

async fn run() -> sparkplug_b::Result<()> {
    let mut host = HostApplication::new(HostConfig::new("SCADA1"), RumqttcTransport::new());
    host.start().await?; // connect + retained online STATE + subscribe spBv1.0/#

    while let Some(event) = host.recv_and_handle().await? {
        match event {
            HostEvent::NodeBirth { group, edge, metrics } => { /* cache the metric set */ }
            HostEvent::NodeData  { group, edge, metrics } => { /* update tags */ }
            HostEvent::NodeDeath { group, edge, devices, .. } => { /* mark stale */ }
            _ => {}
        }
    }
    host.shutdown().await?; // retained offline STATE + disconnect
    Ok(())
}

HostApplication can also push commands with publish_node_command / publish_device_command.

5. Compression envelope (compression feature)

use sparkplug_b::compress::{compress, decompress, Compression};
use sparkplug_b::{EncodeOptions, Metric, MetricValue, Payload};

let payload = Payload::new().with_metric(Metric::new("x", MetricValue::Int32(1)));
let envelope = compress(&payload, EncodeOptions::birth(), Compression::Gzip).unwrap();
let back = decompress(&envelope, None).unwrap();
assert_eq!(back, payload);

6. TLS / mTLS (tls feature)

Set TlsConfig on the engine config. A CA is mandatory (native roots are never loaded); supplying both a client cert and key turns on mTLS. The broker host must match the server certificate's SubjectAltName.

use sparkplug_b::TlsConfig;

let tls = TlsConfig {
    ca_pem: Some(ca_bytes),
    client_cert_pem: Some(cert_bytes), // both present → mTLS
    client_key_pem: Some(key_bytes),
};
// config.tls = Some(tls);

Without the tls feature, requesting TLS fails loudly rather than silently connecting in plaintext.

Build & test

cargo build --all-features
cargo test  --all-features            # 13 test suites incl. an embedded-broker e2e + mTLS e2e
cargo clippy --all-targets --all-features -- -D warnings
cargo fmt --check

The test suite covers codec round-trips (property-based with proptest), hostile-input decoding, composite types (DataSet / Template / PropertySet), compression, topic parsing, the Edge and Host state machines, cross-engine interop, captured wire vectors, and a full edge↔broker↔host end-to-end over the rumqttc/rumqttd pair (plaintext and mTLS).

Requires the Rust 2024 edition toolchain (Rust ≥ 1.85).

Design notes

  • Tagged-enum values (MetricValue) make type/value mismatch unrepresentable — no runtime checkType like the Java reference.
  • Topic is an enum (Node | Device | HostState) with one canonical parser, so the token-count ↔ message-type invariant is enforced by the type system.
  • seq/bdSeq are u8wrapping_add gives the spec's 255 -> 0 wrap for free, avoiding the == 256 sentinel of the Java/Python references.
  • Decoding never panics on hostile input — every failure is a typed SparkplugError, there is a recursion limit, and arbitrary-bytes fuzzing is a property test. unsafe_code is forbidden crate-wide.
  • Engines are deterministic — no spawned tasks, no timers; the caller owns the loop, which makes them trivially testable with an in-memory transport.
  • AliasRegistry::clear clears every map (Tahu leaves the alias map populated — a known leak), and a duplicate alias in a birth is fatal (try_bind).

Licensing

This implementation is original (Apache-2.0 OR MIT). It is informed by the Eclipse Tahu reference implementation (EPL-2.0) but copies no Tahu source; the Sparkplug protobuf schema (field numbers / enum values) is factual.