Expand description
The sift_stream crate is primarily focused on streaming telemetry to Sift in a robust manner.
Here are some features highlights:
- Builtin retries with default or custom retry policies in the case of a Sift outage or a client-side network outage.
- Periodic checkpointing to confirm that all data within a particular period has been received by Sift.
- Optional automated backups to mitigate data-loss in the case of misc. outages.
- Optional tracing/logging to monitor the health of your stream and view various ingestion performance metrics.
Users of this crate will only have to initialize a single instance of SiftStream which they would then use for the entirety of data ingestion for a given asset.
Comprehensive examples can be found in the examples directory of this crate.
§Quick-start
// Define the schema of your telemetry
let ingestion_config = IngestionConfigForm {
asset_name: "MarsRover0".into(),
client_key: "mars-rover0-ingestion-config-v1".into(),
flows: vec![FlowConfig {
name: "robotic-arm".into(),
channels: vec![ChannelConfig {
name: "joint-angle-encoder".into(),
description: "measures the angular position of the arm’s joints".into(),
data_type: ChannelDataType::Double.into(),
unit: "degrees".into(),
..Default::default()
}],
}],
};
// Initialize your Sift Stream (live streaming with backups)
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config.clone())
.live_with_backups()
.build()
.await?;
let flow = Flow::new(
"robotic-arm",
TimeValue::now(),
&[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
);
// Send telemetry to Sift
sift_stream.send(flow).await?;
// Gracefully terminate your stream
sift_stream.finish().await?;
// Alternatively, initialize in FileBackupMode (only writes to backup files)
use sift_stream::backup::DiskBackupPolicy;
use std::path::PathBuf;
let mut backup_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.file_backup()
.disk_backup_policy(DiskBackupPolicy {
backups_dir: Some(PathBuf::from("/data/backups")),
..Default::default()
})
.build()
.await?;
// Send telemetry to backup files (not to Sift)
backup_stream.send(flow).await?;
// Gracefully terminate and flush backup files
backup_stream.finish().await?§Stream Modes
Three transport modes are available, selected via the builder chain. Each mode determines the channel architecture, backpressure behavior, and durability guarantees:
| Builder method | Transport type | Backpressure source | Checkpointing | Disk backup | Retries |
|—|—|—|—|—|
| .live_only() | LiveStreamingOnly | ingestion channel | No | No | Yes |
| .live_with_backups() | LiveStreamingWithBackups | backup channel | Yes | Yes | Yes |
| .file_backup() | FileBackup | write channel | No | Yes | N/A |
§live_only — LiveStreamingOnly
Streams to Sift in real-time over a single bounded ingestion channel. Backpressure is applied
directly when that channel is full: send awaits until the
ingestion task drains capacity. Supports retries.
let stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_only()
.build()
.await?;§live_with_backups — LiveStreamingWithBackups
Streams to Sift in real-time using a dual-channel architecture: a bounded backup channel and a bounded ingestion channel. Backpressure is applied on the backup channel; the ingestion channel uses force-send and never blocks — when full it evicts the oldest buffered message to preserve freshness. Supports retries, checkpointing, and an disk backup strategy for intermittent network failures.
let stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_with_backups()
.build()
.await?;§file_backup — FileBackup
Writes telemetry to rolling disk files without any network ingestion. Backpressure is
applied on the bounded write channel: send awaits until the
file-writer task drains capacity. Data written in this mode can be ingested into Sift later
using the sift-cli tool.
Useful for CI/CD workflows where data only needs to reach Sift if a test fails, or in
environments where network connectivity is unavailable during the recording session.
disk_backup_policy.backups_dir must be set via FileBackupBuilder::disk_backup_policy.
let stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.file_backup()
.disk_backup_policy(DiskBackupPolicy {
backups_dir: Some(PathBuf::from("/data/backups")),
..Default::default()
})
.build()
.await?;§Ingestion Configs
Both stream modes use ingestion-config-based streaming, which requires users to define the schema of their telemetry before they start telemetering data. The key parts of an ingestion config are:
- Asset name: The name of the asset associated with the data that will be streamed.
- Client key: An arbitrary user-sourced identifier that uniquely identifies the ingestion
config; this can be used to achieve client-side versioning e.g.
mars-rover0-sim-v1. - Flows configs: A list of flow configurations. Simply put, a flow configuration is a named group of channels that are often telemetered together; a flow is a single message that contains a list of channel values that share a common timestamp. When sending a flow to Sift, it is expected that the flow has a corresponding flow configuration.
The following is an example of a valid ingestion config for the MarsRover0 asset:
let ingestion_config = IngestionConfigForm {
asset_name: "MarsRover0".into(),
client_key: "mars-rover0-ingestion-config-v1".into(),
flows: vec![
FlowConfig {
name: "robotic-arm".into(),
channels: vec![ChannelConfig {
name: "joint-angle-encoder".into(),
description: "measures the angular position of the arm’s joints".into(),
data_type: ChannelDataType::Double.into(),
unit: "degrees".into(),
..Default::default()
}],
},
FlowConfig {
name: "navigation-system".into(),
channels: vec![
ChannelConfig {
name: "gps-receiver".into(),
description: "measures latitude and longitude".into(),
data_type: ChannelDataType::Int32.into(),
unit: "degrees".into(),
..Default::default()
},
ChannelConfig {
name: "imu".into(),
description: "measures acceleration and angular velocity".into(),
data_type: ChannelDataType::Float.into(),
unit: "m/s^2, deg/s".into(),
..Default::default()
},
],
},
],
};Here is an example of how streaming data into Sift might look using this ingestion config:
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_with_backups()
.build()
.await?;
// Send data for the `robotic-arm` flow.
sift_stream.send(Flow::new(
"robotic-arm",
TimeValue::now(),
&[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
)).await?;
// Send data for the `navigation-system` flow. Notice that the order of the channels
// don't need to be in the same order as they are specified in the `navigation-system`
// flow configuration.
sift_stream.send(Flow::new(
"navigation-system",
TimeValue::now(),
&[
ChannelValue::new("imu", 9.7_f32),
ChannelValue::new("gps-receiver", 10_i32),
],
)).await?;
// Send partial data for the `navigation-system` flow. This totally fine.
sift_stream.send(Flow::new(
"navigation-system",
TimeValue::now(),
&[ChannelValue::new("imu", 9.7_f32)]
)).await?;
// Gracefully terminate your stream
sift_stream.finish().await?§Modifying Ingestion Configs
Ingestion configs should be re-used whenever possible. Simply reusing the same IngestionConfigForm form should allow proper re-use of your ingestion config. If you need to update your ingestion config, it has to be done in a backwards compatible manner. The following changes are considered backwards compatible:
- Adding a new FlowConfig
- Changing the name of an existing FlowConfig (this will simply create a new one)
- If you change the name of a FlowConfig you are also able to edit its ChannelConfigs safely.
- Removing an entire FlowConfig from the list of flow configs
Important Note: Changing an existing FlowConfig in any way is considered a backwards
incompatible change. E.g. say we were to change our joint-angle-encoder’s type in the
robotic-arm flow configuration from a double to an int32 - if this were the case we will end
up with the following error when calling [SiftStreamBuilder::build]:
Error: failed to initialize Sift stream
Caused by:
[IncompatibleIngestionConfigChange]: flow(s) with name 'robotic-arm' exist but their channel configs do not match what the user specified
[cause]:
- incompatible change to ingestion config
[help]:
- Did you modify an existing flow? Try updating the 'client_key' of `sift_stream::IngestionConfigForm`In this situation, simply updating the client key to be something like mars-rover0-ingestion-config-v2 will create a new
ingestion config which will allow users to proceed normally.
§Summary
In summary, re-use an existing ingestion config as much as possible. The following changes can be made without updating the client key:
- Adding a new FlowConfig
- Changing the name of an existing FlowConfig (this will simply create a new one)
- If you change the name of a FlowConfig you are also able to edit its ChannelConfigs safely.
- Removing an entire FlowConfig from the list of flow configs
Anything that falls outside of that will require changing the client-key.
§Sending Telemetry
SiftStream exposes four methods for delivering telemetry. They differ only in whether
they apply backpressure (blocking) or return immediately (non-blocking), and in whether
they accept a high-level Flow or pre-encoded raw requests:
| Method | Blocks? | Input |
|---|---|---|
send | Yes | Flow or any Encodeable |
send_requests | Yes | Pre-encoded requests |
try_send | No | Flow or any Encodeable |
try_send_requests | No | Pre-encoded requests |
§Backpressure with send
send awaits until the backing channel has capacity, then
delivers the message. Use this when you want the producer to slow down naturally when
the pipeline is under load — the simplest and most common choice.
// Awaits until the channel has room; backpressure is applied automatically.
sift_stream.send(Flow::new(
"robotic-arm",
TimeValue::now(),
&[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
)).await?;On error, SiftStreamSendError is returned. Call into_inner() on the
ChannelClosed variant to recover the
undelivered message.
§Non-blocking sends with try_send
try_send returns immediately regardless of channel
state. If the channel is full it returns TrySendError::Full with the message; if
the channel is closed it returns TrySendError::Closed. Use this in tight loops or
real-time contexts where blocking even briefly is unacceptable.
match sift_stream.try_send(Flow::new(
"robotic-arm",
TimeValue::now(),
&[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
)) {
Ok(()) => {}
Err(SiftStreamTrySendError::Channel(TrySendError::Full(msg))) => {
// Channel is busy — drop this sample or buffer it for later.
drop(msg);
}
Err(e) => return Err(e.into()),
}§Pre-encoded batch sends
send_requests and
try_send_requests accept pre-encoded
IngestWithConfigDataStreamRequest
values built with FlowBuilder. This skips the per-call encoding step and is the
highest-throughput option.
let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap();
let run_id = sift_stream.run().unwrap().run_id.clone();
let mut builder = FlowBuilder::new(&descriptor);
builder.attach_run_id(&run_id);
builder.set_with_key("joint-angle-encoder", 7.2_f64).unwrap();
// Blocking batch send with backpressure:
sift_stream.send_requests(vec![builder.request(TimeValue::now())]).await?;
// Non-blocking batch send:
sift_stream.try_send_requests(vec![builder.request(TimeValue::now())])?;On the first failure, send_requests / try_send_requests stop iterating and return
all undelivered messages — the failing one plus any not yet attempted — inside the
error so nothing is silently dropped.
§Retry Policy
At the time of writing this crate, tonic
doesn’t natively support gRPC retries.
sift_stream, however has its own internal mechanism to handle retries to handle the following
cases:
- Client-side network outages
- Transient Sift outages
- Transient errors from Sift’s gRPC service
- Transient errors that may arise from load balancers
Retries are always enabled when using live_with_backups() mode. Users can configure the
retry policy via LiveWithBackupsBuilder::retry_policy:
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_with_backups()
.retry_policy(RetryPolicy::default())
.build()
.await?;For more information see LiveWithBackupsBuilder::retry_policy, RetryPolicy, and DiskBackupPolicy.
§Checkpoints
Checkpointing enables clients to receive periodic acknowledgements from Sift, confirming that all data up to the moment the checkpoint was requested has been received. Checkpointing happens periodically and is enabled by default to occur are a regular interval, with the default interval being stream::builder::DEFAULT_CHECKPOINT_INTERVAL. Users can, however, specify their own custom checkpoint interval:
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_with_backups()
.checkpoint_interval(Duration::from_secs(30))
.build()
.await?;For more information see LiveWithBackupsBuilder::checkpoint_interval.
Important Note: Bear in mind that checkpointing does introduce a bit of overhead, as SiftStream will be re-creating the gRPC stream. gRPC stream, so very small checkpoint intervals are not recommended.
§Concluding a stream
To conclude a stream and receive the final checkpoint acknowledgement from Sift, it is important that users call SiftStream::finish at the end of their stream, otherwise the stream may terminate prematurely resulting in data-loss at the tail-end.
§Backups
Streaming data to Sift is generally very robust and stable, however, due to the asynchronous nature of gRPC streaming, if an error occurs while the user is calling SiftStream::send between checkpoints there is no guarantee that the data sent at the moment the error was triggered on the other end successfully reached Sift.
While checkpointing gives clients assurance that all data has been received up to a certain point, checkpointing alone doesn’t protect against data loss between checkpoints.
To protect against data-loss sift_stream offers a backup mechanism providing async backups
and reingestion of data.
This backup method is disabled by default as it introduces some overhead, but can be enabled like so:
// Async Backups
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.live_with_backups()
.disk_backup_policy(DiskBackupPolicy::default())
.build()
.await?;This example initializes backup strategies with the recommended defaults. This includes the default retry policy, the backups directory, and a max individual backup file size with unlimited rolling backup files, which are deleted once a successful checkpoint is observed. If users wish to configure their own backup settings, see LiveWithBackupsBuilder::disk_backup_policy.
§Retry with Backups
The live_with_backups() mode with a disk_backup_policy configured writes messages passed to SiftStream::send to rolling backup
files in a buffered manner. Once the file size is reached for a given file, that backup is closed out and
the next file is created. If the maximum file count is reached, a checkpoint will be forced. Once a
checkpoint is triggered, if passed, the backup file(s) are deleted unless the DiskBackupPolicy specifies
to retain backups. If a checkpoint fails, the backup files(s) are sent to a separate ingestion task which
re-ingests each file into Sift.
Backup file re-ingestion is performed on each file in the same order as the data was initially sent, with an indefinite number of retries on a file until a successful ingestion response is recieved by Sift. Files are deleted as specified per the DiskBackupPolicy retention policy only once re-ingestion has been confirmed.
§Data Integrity
Important Note: This section only pertains to the disk-based-backup strategy.
The backup files are periodically written to and synced. Each chunk of data written to the backup includes a checksum computed from the chunk itself. When chunks are read back into memory, their checksums are recomputed and compared against the stored values. If a mismatch is detected, the affected chunk and all subsequent chunks are considered corrupt and will be ignored. See the tracing section for details on enabling logs that notify users when this occurs.
§Guarantees
The current backup strategy implementations will protect against data-loss but they do potentially come at a performance cost depending on several variables. The default configurations should satisfy most use-cases, however, users are encouraged to provide their own custom configurations based on their baseline message and byte-rates if they notice backups causing performance issues. Please refer to the tracing section for information on how to get performance metrics.
§Tracing
sift_stream only comes with the tracing feature flag which is enabled by default. With the
tracing feature flag users can observe the health of their Sift stream as well as
performance metrics that are conducive to debugging. The following is an example of the types
of traces users would see with tracing enabled and with RUST_LOG set to the following
RUST_LOG=sift_stream=info
2025-03-23T01:33:45.193457Z INFO an existing ingestion config was found with the provided client-key ingestion_config_id="181bd784-827f-4f3f-a045-ef6b4df6505f"
2025-03-23T01:33:45.335279Z INFO created new run run_id="2acff183-9cb4-44f4-a811-1c76d64ce77f" run_name="millenium-falcon-ep4-1742693624989"
2025-03-23T01:33:45.335801Z INFO Sift streaming successfully initialized
2025-03-23T01:34:45.339499Z INFO initiating checkpoint
2025-03-23T01:34:45.345700Z INFO stream_duration="60s" messages_processed=91823712 message_rate="1530395.2 messages/s" bytes_processed="1.176 GiB" byte_rate="19.6 MiB/s"
2025-03-23T01:34:46.347922Z INFO checkpoint acknowledgement received from Sift - resuming stream
2025-03-23T01:34:46.348091Z INFO successfully initialized a new stream to SiftA quick-start example:
Install sift_stream:
cargo add sift_streamThen install tracing_subscriber with the fmt and env-filter feature flags:
cargo add tracing_subscriber --features fmt,env-filterThen configure a tracing subscriber before you start streaming like so:
tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.init();Now when you execute your program, you can control sift_stream severity levels using the
RUST_LOG environment variable. Here are a few examples:
RUST_LOG=sift_stream=debugRUST_LOG=sift_stream=info
See tracing-subscriber and tracing for further details.
If you do not wish to enable the tracing feature, then simply install sift_stream without
the flag like so:
cargo add sift_stream --no-default-features§Metrics
SiftStream records metrics related to the performance and operational status separately for each SiftStream instance.
While some metrics are provided through tracing, users may expose the ability to access these metrics
by enabled the optional metrics-unstable feature flag.
Metrics are currently considered an unstable feature, and future updates may break the existing metrics API.
When the metrics-unstable feature flag is enabled, users may currently access metrics through one of two methods:
SiftStream::get_metrics_snapshotreturns a SiftStreamMetricsSnapshot- Enable the light weight HTTP metrics server using [metrics::start_metrics_server], which exposes the
/and/metricsendpoints, providing a JSON formatted struct of each sift-stream-id and its SiftStreamMetricsSnapshot
Snapshots of the metrics are taken at any time the user calls SiftStream::get_metrics_snapshot or sends a GET request to the metrics
server endpoints. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to SiftStream
operaration.
§Tokio
Because tonic is an underlying dependency, the
tokio asynchronous runtime is required, otherwise
attempts to use this crate will result in a panic at the level of .build().
This crate is compatible with both the current and multi-threaded Tokio runtimes. Performance is expected to be better generally using the multi-threaded runtime.
§Feature flags
default: Includes thetracingfeature flagtracing: Enables logging of SiftStream through the Tracing crate. See tracingmetrics-unstable: Enables the ability for the user to access SiftStream metrics from each SiftStream instance, or through a light-weight HTML metrics server, if enabled. See metrics
Re-exports§
pub use stream::RetryPolicy;pub use stream::SiftStream;pub use stream::builder::FileBackupBuilder;pub use stream::builder::IngestionConfigForm;pub use stream::builder::LiveOnlyBuilder;pub use stream::builder::LiveWithBackupsBuilder;pub use stream::builder::RunForm;pub use stream::builder::SiftStreamBuilder;pub use stream::builder::StreamConfigBuilder;pub use stream::channel::ChannelEnum;pub use stream::channel::ChannelValue;pub use stream::channel::Value;pub use stream::mode::file_backup::FileBackup;pub use stream::mode::ingestion_config::Flow;pub use stream::mode::ingestion_config::IngestionConfigEncoder;pub use stream::mode::live_only::LiveStreamingOnly;pub use stream::mode::live_with_backups::LiveStreamingWithBackups;pub use stream::send_error::SendError;pub use stream::send_error::SiftStreamSendError;pub use stream::send_error::SiftStreamTrySendError;pub use stream::send_error::TrySendError;pub use stream::time::TimeValue;pub use backup::DiskBackupPolicy;pub use metrics::MetricsServerBuilder;metrics-unstablepub use metrics::SiftStreamMetricsSnapshot;metrics-unstablepub use logging::LogLevel;
Modules§
- backup
- Concerned with backing up data as its streamed to Sift and backups accessible.
- logging
- metrics
- Concerned with metrics for SiftStream
- stream
- Concerned with streaming telemetry to Sift.
Structs§
- Channel
Config - Channel
Index - Represents the index of a channel in a flow.
- Flow
Builder - Builder to assist in constructing a flow, utilizing the flow descriptor to ensure that the flow is constructed correctly (i.e. value in the correct order and the correct data type).
- Flow
Config - Flow
Descriptor - Describes the schema of a flow, providing a convenient, performant, and correct way to build the flow being described.
- Flow
Descriptor Builder - Builds a
FlowDescriptor, which defines the schema of a flow.
Enums§
- Channel
Data Type - Credentials
- Specifies the source of credentials for connecting to Sift.
Type Aliases§
- Sift
Channel - A pre-configured gRPC channel to conveniently establish a connection to Sift’s gRPC API.