use sift_pbfs::BackupsDecoder;
use sift_rs::{
common::r#type::v1::ChannelDataType,
ingest::v1::{
IngestArbitraryProtobufDataStreamRequest, IngestArbitraryProtobufDataStreamResponse,
IngestWithConfigDataStreamRequest, IngestWithConfigDataStreamResponse,
},
ingestion_configs::v2::{ChannelConfig, FlowConfig},
};
use sift_stream::{
ChannelValue, DiskBackupPolicy, Flow, IngestionConfigForm, SiftStreamBuilder, TimeValue,
};
use std::io::BufReader;
use tempdir::TempDir;
use tokio_stream::StreamExt;
mod common;
use common::prelude::*;
struct NoOpIngestService;
#[async_trait]
impl IngestService for NoOpIngestService {
async fn ingest_with_config_data_stream(
&self,
request: Request<Streaming<IngestWithConfigDataStreamRequest>>,
) -> Result<Response<IngestWithConfigDataStreamResponse>, Status> {
let mut stream = request.into_inner();
while stream.try_next().await.unwrap_or(None).is_some() {}
Ok(Response::new(IngestWithConfigDataStreamResponse {}))
}
async fn ingest_arbitrary_protobuf_data_stream(
&self,
_request: Request<Streaming<IngestArbitraryProtobufDataStreamRequest>>,
) -> Result<Response<IngestArbitraryProtobufDataStreamResponse>, Status> {
unimplemented!()
}
}
fn standard_flows() -> Vec<FlowConfig> {
vec![FlowConfig {
name: "flow-0".to_string(),
channels: vec![ChannelConfig {
name: "sensor".to_string(),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
}]
}
fn make_flow(value: f64) -> Flow {
Flow::new(
"flow-0",
TimeValue::default(),
&[ChannelValue::new("sensor", value)],
)
}
fn count_messages_in_dir(dir: &std::path::Path) -> usize {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return 0,
};
let mut total = 0usize;
for entry in entries.flatten() {
let path = entry.path();
if path.is_file()
&& let Ok(f) = std::fs::File::open(&path)
{
let decoder =
BackupsDecoder::<IngestWithConfigDataStreamRequest, _>::new(BufReader::new(f));
total += decoder.filter(|r| r.is_ok()).count();
}
}
total
}
#[tokio::test]
async fn test_file_backup_basic_write() {
let (client, server) = common::start_test_ingest_server(NoOpIngestService).await;
let tmp = TempDir::new("sift_stream_fb_basic").expect("tempdir");
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_file_backup_key".to_string(),
flows: standard_flows(),
})
.file_backup()
.disk_backup_policy(DiskBackupPolicy {
backups_dir: Some(tmp.path().to_path_buf()),
..Default::default()
})
.metrics_streaming_interval(None)
.build()
.await
.expect("build failed");
let num_messages = 20_usize;
for i in 0..num_messages {
stream.send(make_flow(i as f64)).await.expect("send failed");
}
stream.finish().await.expect("finish failed");
let backup_dir = tmp.path().join("test_asset");
let total = count_messages_in_dir(&backup_dir);
assert_eq!(
total, num_messages,
"decoded message count must equal the number sent"
);
assert!(server.await.is_ok());
}
#[tokio::test]
async fn test_file_backup_file_rotation_creates_multiple_files() {
let (client, server) = common::start_test_ingest_server(NoOpIngestService).await;
let tmp = TempDir::new("sift_stream_fb_rotation").expect("tempdir");
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_rotation_key".to_string(),
flows: standard_flows(),
})
.file_backup()
.disk_backup_policy(DiskBackupPolicy {
backups_dir: Some(tmp.path().to_path_buf()),
max_backup_file_size: 1,
..Default::default()
})
.metrics_streaming_interval(None)
.build()
.await
.expect("build failed");
for i in 0..10_usize {
stream.send(make_flow(i as f64)).await.expect("send failed");
}
stream.finish().await.expect("finish failed");
let backup_dir = tmp.path().join("test_asset");
let file_count = std::fs::read_dir(&backup_dir)
.expect("backup dir must exist")
.filter(|e| e.as_ref().map(|e| e.path().is_file()).unwrap_or(false))
.count();
assert!(
file_count > 1,
"rotation must produce more than one backup file, got {file_count}"
);
assert!(server.await.is_ok());
}
#[tokio::test]
async fn test_file_backup_finish_flushes_all() {
let (client, server) = common::start_test_ingest_server(NoOpIngestService).await;
let tmp = TempDir::new("sift_stream_fb_flush").expect("tempdir");
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_flush_key".to_string(),
flows: standard_flows(),
})
.file_backup()
.disk_backup_policy(DiskBackupPolicy {
backups_dir: Some(tmp.path().to_path_buf()),
..Default::default()
})
.metrics_streaming_interval(None)
.build()
.await
.expect("build failed");
let num_messages = 15_usize;
for i in 0..num_messages {
stream.send(make_flow(i as f64)).await.expect("send failed");
}
stream.finish().await.expect("finish failed");
let backup_dir = tmp.path().join("test_asset");
let total = count_messages_in_dir(&backup_dir);
assert_eq!(
total, num_messages,
"finish must flush all messages — none should be lost"
);
assert!(server.await.is_ok());
}
#[tokio::test]
async fn test_file_backup_build_fails_without_backups_dir() {
let (client, server) = common::start_test_ingest_server(NoOpIngestService).await;
let result = SiftStreamBuilder::from_channel(client)
.ingestion_config(IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_no_dir_key".to_string(),
flows: standard_flows(),
})
.file_backup()
.build()
.await;
assert!(
result.is_err(),
"build() must fail when backups_dir is not set"
);
server.abort();
}