Skip to main content

Crate datum

Crate datum 

Source
Expand description

§Datum

🪼 - Just “Stream” anything :]

Datum is a Rust stream processing library scaffolded around Ractor actors and a push-based stream abstraction. The compatibility target is Akka/Pekko Streams Typed API shape and behavior, with Rust-native ownership, async, and benchmarking constraints.

§Install

Add Datum to your project from crates.io:

cargo add datum-core

Or in Cargo.toml:

[dependencies]
datum-core = "0.9"

Note: The package is published as datum-core (the name datum is taken on crates.io), but the import path stays use datum::… — the crate’s library name is datum.

To track an unreleased commit instead:

[dependencies]
datum-core = { git = "https://github.com/Aethergrids/Datum", tag = "v0.9.0" }

§Upstream References

  • Ractor crate: 0.15.13
  • Optional Ractor cluster crate: 0.15.13, enabled with --features cluster
  • Akka source submodule: third_party/akka, tracking upstream main and pinned in this repository at commit 58f1f6db2e505e87f5dc115ee9476833872e7ae0
  • Latest stable Akka tag observed during setup: v2.10.18

§Feature highlights (v0.9.0)

  • Concurrency primitives (datum::concurrent) — Signal<T> (latest-value state cell with a lock-free get() and a coalesced change feed), Subscription<T> (the same state cell with a lossless every-change feed), Channel<T> (closeable bounded MPSC), and Topic<T> (pub-sub broadcast with backpressure/sliding/dropping overflow policies). Benchmarked 4-way against FS2, ZIO, and Akka: every row beats the best JVM competitor on both wall-clock and CPU.
  • API ergonomics (additive, zero semantic change)use datum::prelude::*;, terminal shortcuts (run_fold/run_foreach/run_reduce), (0..n).into_source() / Source::from(vec), the graph wiring DSL builder.wire(a.to(&b)) (with connect() still first-class), and idiomatic try_* fallible operators (try_map, try_filter, … — *_result names deprecated).
  • Complete Akka Streams surface — the linear DSL, GraphDSL with typed junctions and cycles, substreams, dynamic streams (kill switches, hubs), StreamRefs (same-process splice + remote transport via datum-net), streaming IO, and IO adapters bridging to std::io::Read/Write.
  • #![forbid(unsafe_code)] on the datum crate itself; lock-free internals come from safe deps (crossbeam-queue, arc-swap).

§Development

cargo test
cargo check --benches
cargo bench --bench push_baseline
cargo bench --bench source_flow
cargo bench --bench materialization
cargo bench --bench graph
cargo bench --bench actor_ask
benches/actor_ask_compare/run.sh
use datum::prelude::*;

let sum = (0_u64..1_000)
    .into_source()
    .map(|item| item + 1)
    .filter(|item| item % 2 == 0)
    .run_fold(0_u64, |acc, item| acc + item)
    .unwrap()
    .wait()
    .unwrap();

assert_eq!(sum, 250_500);

§Benchmarks

Datum is benchmarked head-to-head against warmed Akka/Pekko Streams across eight core areas — Source/Flow, materialization, graph/junctions, actor ask, dynamic streams, streaming IO, substreams, and queues — plus network carriers (datum-net) and a 4-way concurrency-primitives comparison against FS2, ZIO, and Akka. Honest per-path numbers (including any at-parity rows) are in the result tables under roadmap/benchmarks/. The harness adds a Datum CPU us/op column on purpose: some wins come partly from busy-spinning while a JVM parks — a real cost the wall-clock number hides.

Qualitative summary of the current state:

  • Fused linear path: 13–46x Akka (typed plan; all common sink shapes).
  • Junctions: 2.3–340x; typed kernels cover all major shapes; graph cycles ~29x.
  • Concurrency primitives (vs best of FS2/ZIO/Akka per row): Channel MPSC ×1024 7.5x wall / 20.5x CPU; Signal reads ~ns-class (3.6–7.8x ZIO); Signal propagation ×1024 83x; Subscription lossless ×256 43x; Topic fanout ×1024 23x.
  • Remote StreamRefs (forced-remote Akka Artery-TCP, fair): ~9.9x faster wall, ~14.8x lower CPU, ~35x lower allocation; same-process splice ~6.5x.
  • Dynamic hubs: MergeHub p16 ~16x (p1 ~0.46x — core materialization floor, documented); BroadcastHub 1.3–2x; PartitionHub 1.4–2.7x.
  • Queues: source-queue backpressure ~985x; bounded queue ~parity wall with ~112x less allocation.
  • Actor ask: ~2.4–3x at p2–p4; allocation dominated by upstream Ractor boxing (~848 B/element, upstream-blocked).
  • Streaming IO adapters: ~7.4x on the round-trip scenario; materialization: at parity.

Run a comparison (requires a JDK + sbt); each runner writes a rendered table under target/<area>-comparison/:

benches/source_flow_compare/run.sh
benches/materialization_compare/run.sh
benches/graph_compare/run.sh
benches/actor_ask_compare/run.sh
benches/dynamic_streams_compare/run.sh
benches/streaming_io_compare/run.sh
benches/substreams_compare/run.sh

Current result tables, the per-operator coverage matrix, and apples-to-apples caveats:

  • roadmap/benchmarks/source-flow.md
  • roadmap/benchmarks/materialization.md
  • roadmap/benchmarks/graph.md
  • roadmap/benchmarks/actor-ask.md
  • roadmap/benchmarks/dynamic-streams.md
  • roadmap/benchmarks/streaming-io.md
  • roadmap/benchmarks/substreams.md
  • roadmap/M1-v0.1.0-foundation.md — coverage matrix, optimization status & apples-to-apples caveats
  • roadmap/M4-v0.4.0-completeness-hardening.md — M4 work packages and exit criteria

See roadmap/ for the full milestone roadmap. User documentation is the published VitePress site (Cloudflare Pages); docs/ holds its source. roadmap/ holds internal planning and benchmark records.

Re-exports§

pub use actor::ActorFlow;
pub use actor::ActorPubSub;
pub use actor::ActorSink;
pub use actor::ActorSinkBackpressureMessage;
pub use actor::ActorSinkMessage;
pub use actor::ActorSource;
pub use actor::ActorSourceMessage;
pub use actor::ActorStatus;
pub use actor::ReplyPort;
pub use actor::ReplySendError;
pub use actor::SinkRef;
pub use actor::SourceRef;
pub use actor::StreamRefFrame;
pub use actor::StreamRefId;
pub use actor::StreamRefMessage;
pub use actor::StreamRefOutbound;
pub use actor::StreamRefPayload;
pub use actor::StreamRefPayloadBatch;
pub use actor::StreamRefPayloadBytes;
pub use actor::StreamRefProtoConsumer;
pub use actor::StreamRefProtoEndpoint;
pub use actor::StreamRefProtoProducer;
pub use actor::StreamRefSettings;
pub use actor::StreamRefs;
pub use actor::WatchEvent;
pub use concurrent::Signal;
pub use concurrent::Subscription;
pub use concurrent::SubscriptionOverflow;
pub use concurrent::Topic;
pub use concurrent::TopicOverflow;
pub use concurrent::TopicPublishError;
pub use concurrent::TopicTryPublishError;
pub use concurrent::channel::Channel;
pub use concurrent::channel::SendError as ChannelSendError;
pub use concurrent::channel::TrySendError;
pub use context::FlowWithContext;
pub use context::SourceWithContext;
pub use dynamic::BroadcastHub;
pub use dynamic::BroadcastHubConsumerSource;
pub use dynamic::KillSwitches;
pub use dynamic::MergeHub;
pub use dynamic::MergeHubDrainingControl;
pub use dynamic::PartitionConsumerInfo;
pub use dynamic::PartitionHub;
pub use dynamic::PartitionHubConsumerSource;
pub use dynamic::SharedKillSwitch;
pub use dynamic::UniqueKillSwitch;
pub use graph::AnyInlet;
pub use graph::AnyOutlet;
pub use graph::AsyncBoundary;
pub use graph::AsyncBoundaryExecutionConfig;
pub use graph::Balance;
pub use graph::BidiShape;
pub use graph::Broadcast;
pub use graph::Buffer;
pub use graph::Concat;
pub use graph::FanInShape;
pub use graph::FanOutShape;
pub use graph::FanOutShape2;
pub use graph::FlowShape;
pub use graph::FlowShape as GraphFlowShape;
pub use graph::FusedExecutionConfig;
pub use graph::FusedExecutionReport;
pub use graph::FusedTerminalReport;
pub use graph::Graph;
pub use graph::GraphBlueprint;
pub use graph::GraphBuilder;
pub use graph::GraphDsl;
pub use graph::GraphStage;
pub use graph::GraphStageLogic;
pub use graph::Identity;
pub use graph::ImportedGraph;
pub use graph::InHandler;
pub use graph::Inlet;
pub use graph::InletCursor;
pub use graph::Interleave;
pub use graph::MapStage;
pub use graph::Merge;
pub use graph::MergeLatest;
pub use graph::MergePreferred;
pub use graph::MergePreferredShape;
pub use graph::MergePrioritized;
pub use graph::MergeSequence;
pub use graph::MergeSorted;
pub use graph::OrElse;
pub use graph::OutHandler;
pub use graph::Outlet;
pub use graph::OutletCursor;
pub use graph::PartialGraph;
pub use graph::Partition;
pub use graph::PortAllocator;
pub use graph::PortId;
pub use graph::PortKind;
pub use graph::SinkShape;
pub use graph::SourceShape;
pub use graph::StageSpec;
pub use graph::TakeWhile;
pub use graph::TimerHandler;
pub use graph::Unzip;
pub use graph::UnzipWith;
pub use graph::WireDsl;
pub use graph::WirePair;
pub use graph::WireSpec;
pub use graph::Zip;
pub use graph::ZipShape;
pub use io::Compression;
pub use io::FileIO;
pub use io::Framing;
pub use io::FramingByteOrder;
pub use io::InputStreamHandle;
pub use io::IoResult;
pub use io::OutputStreamHandle;
pub use io::StreamConverters;
pub use io::TcpBinding;
pub use io::TcpConnection;
pub use io::TcpIncomingConnection;
pub use io::TokioByteSink;
pub use io::TokioByteSource;
pub use io::TokioFileIO;
pub use io::TokioTcp;
pub use queue::BoundedSourceQueue;
pub use queue::QueueOfferResult;
pub use queue::SinkQueue;
pub use queue::SourceQueue;
pub use stream::AggregateTimer;
pub use stream::BidiFlow;
pub use stream::Cancellable;
pub use stream::DelayOverflowStrategy;
pub use stream::Demand;
pub use stream::Flow;
pub use stream::IntoSource;
pub use stream::Keep;
pub use stream::Materializer;
pub use stream::MaybeHandle;
pub use stream::NotUsed;
pub use stream::OverflowStrategy;
pub use stream::PushOutlet;
pub use stream::RestartFlow;
pub use stream::RestartSettings;
pub use stream::RestartSink;
pub use stream::RestartSource;
pub use stream::RetryFlow;
pub use stream::RunnableGraph;
pub use stream::Runtime;
pub use stream::Sink;
pub use stream::SinkCombineStrategy;
pub use stream::Source;
pub use stream::SourceCombineStrategy;
pub use stream::StreamCompletion;
pub use stream::StreamError;
pub use stream::StreamResult;
pub use stream::Supervision;
pub use stream::SupervisionDecider;
pub use stream::SupervisionDirective;
pub use stream::ThrottleMode;

Modules§

actor
Actor interop: Ractor-backed stream bridges plus the remote StreamRefs seam.
concurrent
Stream-native concurrency primitives.
context
Context-aware stream wrappers.
dynamic
Dynamic stream controls and attachment points modeled after Akka Streams.
graph
Runtime-checked graph, shape, port, and junction primitives.
io
Streaming I/O — file and TCP sources/sinks, byte framing, compression, and std::io bridges.
prelude
queue
Bounded and unbounded queues that bridge external producers into a Datum stream.
stream
The linear Source/Flow/Sink DSL and the runtime that materializes it — Datum’s primary public surface, mirroring Akka/Pekko Streams Typed.
testkit
Stream testing probes built on Datum’s normal runtime.

Structs§

ActorRef
An ActorRef is a strongly-typed wrapper over an ActorCell to provide some syntactic wrapping on the requirement to pass the actor’s message type everywhere.
Attributes
An ordered, immutable collection of Attributes.

Enums§

Attribute
A single piece of stream/stage metadata.