use crate::stream::mode::ingestion_config::IngestionConfigEncoder;
use crate::stream::send_error::{SendError, TrySendError};
use crate::stream::{SiftStream, Transport, private::Sealed};
use crate::{
backup::disk::file_writer::{FileWriter, FileWriterConfig},
metrics::SiftStreamMetrics,
stream::flow::FlowDescriptor,
};
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use prost::Message;
use sift_connect::SiftChannel;
use sift_error::prelude::*;
use sift_rs::{
ingest::v1::IngestWithConfigDataStreamRequest, ingestion_configs::v2::IngestionConfig,
runs::v2::Run,
};
use std::collections::HashSet;
use std::io::ErrorKind as IoErrorKind;
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tokio::{sync::broadcast, task::JoinHandle};
use uuid::Uuid;
struct FileBackupWriter {
file_writer: FileWriter,
metrics: Arc<SiftStreamMetrics>,
}
impl FileBackupWriter {
fn new(file_writer_config: FileWriterConfig, metrics: Arc<SiftStreamMetrics>) -> Self {
Self {
file_writer: FileWriter::new(file_writer_config),
metrics,
}
}
async fn handle_request(&mut self, request: &IngestWithConfigDataStreamRequest) -> Result<()> {
if self.file_writer.should_rotate_file() {
let _ = self.file_writer.rotate_file().await?;
self.metrics.backups.log_new_file();
}
self.file_writer.write_request(request).await?;
self.metrics
.backups
.log_message(request.encoded_len() as u64);
Ok(())
}
async fn finalize(&mut self) -> Result<()> {
self.file_writer.flush().await?;
self.file_writer.sync().await?;
Ok(())
}
pub(crate) async fn run(
mut self,
write_rx: Receiver<Arc<IngestWithConfigDataStreamRequest>>,
) -> Result<()> {
while let Ok(request) = write_rx.recv().await {
if let Err(e) = self.handle_request(&request).await {
#[cfg(feature = "tracing")]
tracing::error!(
error = %e,
"error handling request"
);
}
}
self.finalize().await?;
Ok(())
}
}
pub struct FileBackup {
write_tx: Sender<Arc<IngestWithConfigDataStreamRequest>>,
write_task: JoinHandle<Result<()>>,
control_tx: broadcast::Sender<crate::stream::tasks::ControlMessage>,
metrics_streaming: Option<JoinHandle<Result<()>>>,
flows_seen: HashSet<String>,
metrics: Arc<SiftStreamMetrics>,
}
impl Sealed for FileBackup {}
impl FileBackup {
fn prepare_message(
&mut self,
stream_id: &Uuid,
message: IngestWithConfigDataStreamRequest,
) -> Arc<IngestWithConfigDataStreamRequest> {
self.metrics.messages_received.increment();
#[cfg(feature = "tracing")]
{
if !self.flows_seen.contains(&message.flow) {
self.metrics.unique_flows_received.increment();
self.flows_seen.insert(message.flow.clone());
tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow);
}
}
self.metrics
.backup_channel_depth
.set(self.write_tx.len() as u64);
Arc::new(message)
}
}
#[async_trait]
impl Transport for FileBackup {
type Encoder = IngestionConfigEncoder;
type Message = IngestWithConfigDataStreamRequest;
async fn send(
&mut self,
stream_id: &Uuid,
message: Self::Message,
) -> std::result::Result<(), SendError<Self::Message>> {
let arc = self.prepare_message(stream_id, message);
self.write_tx
.send(arc)
.await
.map_err(|async_channel::SendError(a)| {
SendError(Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()))
})?;
self.metrics.messages_sent.increment();
Ok(())
}
fn try_send(
&mut self,
stream_id: &Uuid,
message: Self::Message,
) -> std::result::Result<(), TrySendError<Self::Message>> {
let arc = self.prepare_message(stream_id, message);
match self.write_tx.try_send(arc) {
Ok(()) => {
self.metrics.messages_sent.increment();
Ok(())
}
Err(async_channel::TrySendError::Full(a)) => Err(TrySendError::Full(
Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()),
)),
Err(async_channel::TrySendError::Closed(a)) => Err(TrySendError::Closed(
Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()),
)),
}
}
async fn send_requests<I>(
&mut self,
stream_id: &Uuid,
requests: I,
) -> std::result::Result<(), SendError<Vec<Self::Message>>>
where
I: IntoIterator<Item = Self::Message> + Send,
I::IntoIter: Send,
{
let mut iter = requests.into_iter();
while let Some(msg) = iter.next() {
if let Err(SendError(failed)) = self.send(stream_id, msg).await {
let mut undelivered = vec![failed];
undelivered.extend(iter);
return Err(SendError(undelivered));
}
}
Ok(())
}
fn try_send_requests<I>(
&mut self,
stream_id: &Uuid,
requests: I,
) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
where
I: IntoIterator<Item = Self::Message> + Send,
I::IntoIter: Send,
{
let mut iter = requests.into_iter();
while let Some(msg) = iter.next() {
match self.try_send(stream_id, msg) {
Ok(()) => {}
Err(TrySendError::Full(failed)) => {
let mut undelivered = vec![failed];
undelivered.extend(iter);
return Err(TrySendError::Full(undelivered));
}
Err(TrySendError::Closed(failed)) => {
let mut undelivered = vec![failed];
undelivered.extend(iter);
return Err(TrySendError::Closed(undelivered));
}
}
}
Ok(())
}
async fn finish(self, stream_id: &Uuid) -> Result<()> {
let _ = self
.control_tx
.send(crate::stream::tasks::ControlMessage::Shutdown);
drop(self.write_tx);
self.write_task.await.map_err(|e| {
Error::new_msg(
ErrorKind::StreamError,
format!("file backup write task panicked: {e}"),
)
})??;
if let Some(metrics_streaming) = self.metrics_streaming {
let _ = metrics_streaming.await;
}
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %stream_id,
"successfully finished file backup stream"
);
Ok(())
}
}
impl FileBackup {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
file_writer_config: FileWriterConfig,
channel_capacity: usize,
metrics: Arc<SiftStreamMetrics>,
control_channel_capacity: usize,
metrics_streaming_interval: Option<Duration>,
setup_channel: SiftChannel,
session_name: String,
sift_stream_id: Uuid,
) -> Result<Self> {
let (write_tx, write_rx): (
Sender<Arc<IngestWithConfigDataStreamRequest>>,
Receiver<Arc<IngestWithConfigDataStreamRequest>>,
) = async_channel::bounded(channel_capacity);
let writer = FileBackupWriter::new(file_writer_config, metrics.clone());
let write_task = tokio::spawn(async move { writer.run(write_rx).await });
let (control_tx, _control_rx) = broadcast::channel(control_channel_capacity);
let metrics_streaming = if let Some(interval) = metrics_streaming_interval {
let control_rx = control_tx.subscribe();
let metrics_clone = metrics.clone();
Some(tokio::spawn(async move {
let metrics_task = crate::stream::tasks::MetricsStreamingTask::new(
setup_channel,
control_rx,
session_name.clone(),
interval,
metrics_clone,
None,
)
.await?;
#[cfg(feature = "tracing")]
tracing::info!(
sift_stream_id = %sift_stream_id,
"metrics streaming task started for file backup mode"
);
metrics_task.run().await
}))
} else {
None
};
Ok(Self {
write_tx,
write_task,
control_tx,
metrics_streaming,
flows_seen: HashSet::new(),
metrics,
})
}
}
impl SiftStream<IngestionConfigEncoder, FileBackup> {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new_file_backup(
grpc_channel: SiftChannel,
ingestion_config: IngestionConfig,
flows_by_name: HashMap<String, FlowDescriptor<String>>,
run: Option<Run>,
backups_directory: PathBuf,
output_directory: PathBuf,
max_file_size: usize,
channel_capacity: usize,
control_channel_capacity: usize,
metrics_streaming_interval: Option<Duration>,
session_name: String,
sift_stream_id: Uuid,
metrics: Arc<SiftStreamMetrics>,
) -> Result<Self> {
let full_backup_path = backups_directory.join(output_directory);
if let Err(err) = fs::create_dir_all(&full_backup_path).await
&& err.kind() != IoErrorKind::AlreadyExists
{
return Err(Error::new(ErrorKind::BackupsError, err))
.with_context(|| format!("failed to create directory for backups at {}", full_backup_path.display()))
.help("if using a custom path for backups directory ensure that it's valid with proper permissions, otherwise contact Sift");
}
let file_writer_config = FileWriterConfig {
directory: full_backup_path,
prefix: ingestion_config.client_key.clone(),
max_size: max_file_size,
};
let file_backup = FileBackup::new(
file_writer_config,
channel_capacity,
metrics.clone(),
control_channel_capacity,
metrics_streaming_interval,
grpc_channel.clone(),
session_name,
sift_stream_id,
)?;
Ok(Self {
grpc_channel: grpc_channel.clone(),
encoder: IngestionConfigEncoder {
grpc_channel,
flows_by_name,
ingestion_config,
metrics,
},
transport: file_backup,
run,
sift_stream_id,
})
}
}
pub use super::ingestion_config::Flow;
#[cfg(test)]
mod tests {
use super::*;
use crate::test::create_mock_grpc_channel_with_service;
use crate::{FlowBuilder, TimeValue};
use sift_rs::common::r#type::v1::ChannelDataType;
use std::collections::HashMap;
use tempdir::TempDir;
async fn wait_for_backup_metrics(
metrics: &SiftStreamMetrics,
expected_total_messages: u64,
timeout_ms: u64,
) {
let start = std::time::Instant::now();
let timeout = tokio::time::Duration::from_millis(timeout_ms);
loop {
let total_messages = metrics.backups.total_messages.get();
if total_messages >= expected_total_messages {
return;
}
if start.elapsed() > timeout {
panic!(
"Timeout waiting for backup metrics: expected {} messages, got {}",
expected_total_messages, total_messages
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
async fn create_test_file_backup_mode(
file_writer_config: FileWriterConfig,
channel_capacity: usize,
metrics: Arc<SiftStreamMetrics>,
) -> FileBackup {
let (grpc_channel, _) = create_mock_grpc_channel_with_service().await;
FileBackup::new(
file_writer_config,
channel_capacity,
metrics,
100, None, grpc_channel,
"test_session".to_string(),
Uuid::new_v4(),
)
.unwrap()
}
mod file_backup_writer {
use super::*;
fn create_test_request(
flow: &str,
ingestion_config_id: &str,
) -> IngestWithConfigDataStreamRequest {
IngestWithConfigDataStreamRequest {
ingestion_config_id: ingestion_config_id.to_string(),
flow: flow.to_string(),
timestamp: None,
channel_values: vec![],
run_id: Uuid::new_v4().to_string(),
end_stream_on_validation_error: false,
organization_id: Uuid::new_v4().to_string(),
}
}
#[tokio::test]
async fn test_file_backup_writer_handle_request() {
let temp_dir = TempDir::new("test_file_backup_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024, };
let metrics = Arc::new(SiftStreamMetrics::default());
let mut writer = FileBackupWriter::new(config, metrics);
let ingestion_config_id = Uuid::new_v4().to_string();
let request = create_test_request("test_flow", &ingestion_config_id);
writer.handle_request(&request).await.unwrap();
assert!(writer.file_writer.current_file.is_some());
assert_eq!(writer.file_writer.current_file_ctx.message_count, 1);
assert!(writer.file_writer.current_file_ctx.file_size > 0);
assert!(writer.file_writer.current_file_ctx.file_path.exists());
}
#[tokio::test]
async fn test_file_backup_writer_handle_request_rotates_file() {
let temp_dir = TempDir::new("test_file_backup_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 100, };
let metrics = Arc::new(SiftStreamMetrics::default());
let mut writer = FileBackupWriter::new(config, metrics);
let ingestion_config_id = Uuid::new_v4().to_string();
let request = create_test_request("test_flow", &ingestion_config_id);
assert!(writer.handle_request(&request).await.is_ok());
let file_path_before_rotation = writer.file_writer.current_file_ctx.file_path.clone();
for _ in 0..100 {
writer.handle_request(&request).await.unwrap();
if writer.file_writer.current_file_ctx.file_path != file_path_before_rotation {
break;
}
}
assert!(file_path_before_rotation.exists());
}
#[tokio::test]
async fn test_file_backup_writer_finalize() {
let temp_dir = TempDir::new("test_file_backup_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024,
};
let metrics = Arc::new(SiftStreamMetrics::default());
let mut writer = FileBackupWriter::new(config, metrics);
let ingestion_config_id = Uuid::new_v4().to_string();
let request = create_test_request("test_flow", &ingestion_config_id);
writer.handle_request(&request).await.unwrap();
let file_path = writer.file_writer.current_file_ctx.file_path.clone();
let message_count_before = writer.file_writer.current_file_ctx.message_count;
let file_size_before = writer.file_writer.current_file_ctx.file_size;
assert!(file_path.exists());
assert_eq!(message_count_before, 1);
assert!(file_size_before > 0);
writer.finalize().await.unwrap();
assert!(file_path.exists());
}
#[tokio::test]
async fn test_file_backup_writer_run_drains_channel() {
let temp_dir = TempDir::new("test_file_backup_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024,
};
let writer = FileBackupWriter::new(config, Arc::new(SiftStreamMetrics::default()));
let (tx, rx) = async_channel::bounded(10);
let ingestion_config_id = Uuid::new_v4().to_string();
for i in 0..5 {
let request = create_test_request(&format!("flow_{}", i), &ingestion_config_id);
tx.send(Arc::new(request)).await.unwrap();
}
drop(tx);
writer.run(rx).await.unwrap();
let files: Vec<_> = std::fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry
.path()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("test-"))
.unwrap_or(false)
})
.collect();
assert!(
!files.is_empty(),
"Expected at least one file with prefix 'test-' to be created"
);
for file in &files {
let metadata = std::fs::metadata(file.path()).unwrap();
assert!(metadata.len() > 0, "File should have content");
}
}
#[tokio::test]
async fn test_file_backup_writer_run_continues_after_handle_request_error() {
let temp_dir = TempDir::new("test_file_backup_writer").unwrap();
let backup_dir = temp_dir.path().join("backup_subdir");
assert!(!backup_dir.exists(), "subdir must not exist yet");
let config = FileWriterConfig {
directory: backup_dir.clone(),
prefix: "test".to_string(),
max_size: 1024 * 1024,
};
let writer = FileBackupWriter::new(config, Arc::new(SiftStreamMetrics::default()));
let (write_tx, write_rx) = async_channel::bounded(10);
let ingestion_config_id = Uuid::new_v4().to_string();
let run_handle = tokio::spawn(async move { writer.run(write_rx).await });
let request = create_test_request("flow_0", &ingestion_config_id);
write_tx.send(Arc::new(request)).await.unwrap();
while !write_tx.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
assert!(!run_handle.is_finished());
tokio::fs::create_dir_all(&backup_dir).await.unwrap();
let second_request = create_test_request("flow_1", &ingestion_config_id);
write_tx.send(Arc::new(second_request)).await.unwrap();
while !write_tx.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
drop(write_tx);
assert!(
run_handle.await.unwrap().is_ok(),
"run task should complete successfully."
);
let files: Vec<_> = std::fs::read_dir(&backup_dir)
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry
.path()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("test-"))
.unwrap_or(false)
})
.collect();
assert!(
!files.is_empty(),
"Expected at least one file with prefix 'test-' after creating directory"
);
for file in &files {
let metadata = std::fs::metadata(file.path()).unwrap();
assert!(metadata.len() > 0, "File should have content");
}
}
}
fn create_test_ingestion_config() -> IngestionConfig {
IngestionConfig {
ingestion_config_id: Uuid::new_v4().to_string(),
asset_id: Uuid::new_v4().to_string(),
client_key: "test_client_key".to_string(),
}
}
fn create_test_flow_descriptor(
ingestion_config_id: &str,
flow_name: &str,
) -> FlowDescriptor<String> {
let mut builder = crate::stream::flow::FlowDescriptorBuilder::new(
ingestion_config_id.to_string(),
flow_name.to_string(),
);
builder.add("channel1".to_string(), ChannelDataType::Double);
builder.add("channel2".to_string(), ChannelDataType::Int32);
builder.build()
}
fn create_test_request(
flow: &str,
ingestion_config_id: &str,
) -> IngestWithConfigDataStreamRequest {
IngestWithConfigDataStreamRequest {
ingestion_config_id: ingestion_config_id.to_string(),
flow: flow.to_string(),
timestamp: None,
channel_values: vec![],
run_id: Uuid::new_v4().to_string(),
end_stream_on_validation_error: false,
organization_id: Uuid::new_v4().to_string(),
}
}
#[tokio::test]
async fn test_file_backup_metrics_streaming_task_completes_when_control_channel_closed() {
let (setup_channel, _) = create_mock_grpc_channel_with_service().await;
let temp_dir = TempDir::new("test_file_backup").unwrap();
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024,
};
let metrics = Arc::new(SiftStreamMetrics::default());
let file_backup = FileBackup::new(
file_writer_config,
10,
metrics,
100,
Some(std::time::Duration::from_secs(60)),
setup_channel,
"test_session".to_string(),
Uuid::new_v4(),
)
.unwrap();
let FileBackup {
metrics_streaming,
control_tx,
..
} = file_backup;
drop(control_tx);
let metrics_streaming_result = metrics_streaming
.expect("metrics streaming task")
.await
.expect("metrics streaming task should complete successfully.");
assert!(
metrics_streaming_result.is_ok(),
"metrics streaming task should have returned success."
);
}
#[tokio::test]
async fn test_file_backup_mode_send_impl() {
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let sift_stream_id = Uuid::new_v4();
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: ingestion_config.client_key.clone(),
max_size: 1024 * 1024, };
let mut mode =
create_test_file_backup_mode(file_writer_config, 1024 * 100, metrics.clone()).await;
let request = create_test_request("test_flow", &ingestion_config.ingestion_config_id);
mode.try_send(&sift_stream_id, request).unwrap();
wait_for_backup_metrics(&metrics, 1, 1000).await;
assert!(metrics.messages_sent.get() > 0);
assert_eq!(metrics.backups.total_messages.get(), 1);
mode.finish(&sift_stream_id).await.unwrap();
}
#[tokio::test]
async fn test_file_backup_mode_send_impl_tracks_unique_flows() {
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let sift_stream_id = Uuid::new_v4();
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: ingestion_config.client_key.clone(),
max_size: 1024 * 1024,
};
let mut mode =
create_test_file_backup_mode(file_writer_config, 1024 * 100, metrics.clone()).await;
let request1 = create_test_request("flow1", &ingestion_config.ingestion_config_id);
let request2 = create_test_request("flow2", &ingestion_config.ingestion_config_id);
let request3 = create_test_request("flow1", &ingestion_config.ingestion_config_id);
mode.try_send(&sift_stream_id, request1).unwrap();
mode.try_send(&sift_stream_id, request2).unwrap();
mode.try_send(&sift_stream_id, request3).unwrap();
wait_for_backup_metrics(&metrics, 3, 1000).await;
assert_eq!(metrics.unique_flows_received.get(), 2);
assert_eq!(metrics.messages_sent.get(), 3);
assert_eq!(metrics.backups.total_messages.get(), 3);
mode.finish(&sift_stream_id).await.unwrap();
}
#[tokio::test]
async fn test_file_backup_mode_send_requests() {
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let sift_stream_id = Uuid::new_v4();
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: ingestion_config.client_key.clone(),
max_size: 1024 * 1024,
};
let mut mode =
create_test_file_backup_mode(file_writer_config, 1024 * 100, metrics.clone()).await;
let requests = vec![
create_test_request("flow1", &ingestion_config.ingestion_config_id),
create_test_request("flow2", &ingestion_config.ingestion_config_id),
create_test_request("flow3", &ingestion_config.ingestion_config_id),
];
mode.try_send_requests(&sift_stream_id, requests).unwrap();
wait_for_backup_metrics(&metrics, 3, 1000).await;
assert_eq!(metrics.messages_received.get(), 3);
assert_eq!(metrics.messages_sent.get(), 3);
assert_eq!(metrics.backups.total_messages.get(), 3);
mode.finish(&sift_stream_id).await.unwrap();
}
#[tokio::test]
async fn test_file_backup_mode_send_with_flow_descriptor() {
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let sift_stream_id = Uuid::new_v4();
let flow_name = "test_flow";
let flow_descriptor =
create_test_flow_descriptor(&ingestion_config.ingestion_config_id, flow_name);
let mut flows_by_name = HashMap::new();
flows_by_name.insert(flow_name.to_string(), flow_descriptor);
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: ingestion_config.client_key.clone(),
max_size: 1024 * 1024,
};
let mut mode =
create_test_file_backup_mode(file_writer_config, 1024 * 100, metrics.clone()).await;
let descriptor =
create_test_flow_descriptor(flow_name, &ingestion_config.ingestion_config_id);
let mut builder = FlowBuilder::new(&descriptor);
assert!(builder.set_with_key("channel1", 1.0_f64).is_ok());
assert!(builder.set_with_key("channel2", 42_i32).is_ok());
let request = builder.request(TimeValue::now());
mode.try_send(&sift_stream_id, request).unwrap();
wait_for_backup_metrics(&metrics, 1, 1000).await;
assert_eq!(metrics.messages_received.get(), 1);
assert_eq!(metrics.messages_sent.get(), 1);
assert_eq!(metrics.backups.total_messages.get(), 1);
mode.finish(&sift_stream_id).await.unwrap();
}
#[tokio::test]
async fn test_file_backup_mode_send_without_flow_descriptor() {
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let sift_stream_id = Uuid::new_v4();
let file_writer_config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: ingestion_config.client_key.clone(),
max_size: 1024 * 1024,
};
let mut mode =
create_test_file_backup_mode(file_writer_config, 1024 * 100, metrics.clone()).await;
let descriptor =
create_test_flow_descriptor("unknown_flow", &ingestion_config.ingestion_config_id);
let mut builder = FlowBuilder::new(&descriptor);
assert!(builder.set_with_key("channel1", 1.0_f64).is_ok());
assert!(builder.set_with_key("channel2", 42_i32).is_ok());
let request = builder.request(TimeValue::now());
mode.try_send(&sift_stream_id, request).unwrap();
wait_for_backup_metrics(&metrics, 1, 1000).await;
assert_eq!(metrics.messages_received.get(), 1);
assert_eq!(metrics.messages_sent.get(), 1);
assert_eq!(metrics.backups.total_messages.get(), 1);
mode.finish(&sift_stream_id).await.unwrap();
}
#[tokio::test]
async fn test_sift_stream_finish() {
let (grpc_channel, _) = create_mock_grpc_channel_with_service().await;
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let session_name = format!("test_stream.{}", ingestion_config.client_key);
let sift_stream_id = Uuid::new_v4();
let stream = SiftStream::new_file_backup(
grpc_channel,
ingestion_config,
HashMap::new(),
None,
temp_dir.path().to_path_buf(),
temp_dir.path().to_path_buf(),
1024 * 1024,
1024 * 100, 100, None, session_name,
sift_stream_id,
metrics,
)
.await
.expect("failed to create file backup stream");
stream.finish().await.unwrap();
}
#[tokio::test]
async fn test_sift_stream_finish_with_written_data() {
let (grpc_channel, _) = create_mock_grpc_channel_with_service().await;
let ingestion_config = create_test_ingestion_config();
let temp_dir = TempDir::new("test_file_backup").unwrap();
let metrics = Arc::new(SiftStreamMetrics::default());
let session_name = format!("test_stream.{}", ingestion_config.client_key);
let sift_stream_id = Uuid::new_v4();
let mut stream = SiftStream::new_file_backup(
grpc_channel,
ingestion_config.clone(),
HashMap::new(),
None,
temp_dir.path().to_path_buf(),
temp_dir.path().to_path_buf(),
1024 * 1024,
1024 * 100, 100, None, session_name,
sift_stream_id,
metrics,
)
.await
.expect("failed to create file backup stream");
let request = create_test_request("test_flow", &ingestion_config.ingestion_config_id);
stream.send_requests(vec![request]).await.unwrap();
stream.finish().await.unwrap();
}
fn make_file_backup_with_capacity(
cap: usize,
) -> (
FileBackup,
async_channel::Receiver<Arc<IngestWithConfigDataStreamRequest>>,
) {
let (write_tx, write_rx) = async_channel::bounded(cap);
let (control_tx, _) = tokio::sync::broadcast::channel(10);
let fb = FileBackup {
write_tx,
write_task: tokio::spawn(async { Ok(()) }),
control_tx,
metrics_streaming: None,
flows_seen: HashSet::new(),
metrics: Arc::new(SiftStreamMetrics::default()),
};
(fb, write_rx)
}
#[tokio::test]
async fn test_file_backup_try_send_full_returns_full() {
let (mut fb, write_rx) = make_file_backup_with_capacity(1);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
fb.try_send(
&stream_id,
create_test_request("flow1", &ingestion_config_id),
)
.unwrap();
let req = create_test_request("flow2", &ingestion_config_id);
let flow = req.flow.clone();
let err = fb.try_send(&stream_id, req).unwrap_err();
assert!(err.is_full(), "expected Full, got {err}");
assert_eq!(err.into_inner().flow, flow);
drop(write_rx);
}
#[tokio::test]
async fn test_file_backup_try_send_closed_returns_closed() {
let (mut fb, write_rx) = make_file_backup_with_capacity(10);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
drop(write_rx);
let req = create_test_request("flow1", &ingestion_config_id);
let flow = req.flow.clone();
let err = fb.try_send(&stream_id, req).unwrap_err();
assert!(err.is_closed(), "expected Closed, got {err}");
assert_eq!(err.into_inner().flow, flow);
}
#[tokio::test]
async fn test_file_backup_send_closed_returns_send_error() {
let (mut fb, write_rx) = make_file_backup_with_capacity(10);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
drop(write_rx);
let req = create_test_request("flow1", &ingestion_config_id);
let flow = req.flow.clone();
let err = fb.send(&stream_id, req).await.unwrap_err();
assert_eq!(err.into_inner().flow, flow);
}
#[tokio::test]
async fn test_file_backup_send_blocks_until_space_available() {
let (mut fb, write_rx) = make_file_backup_with_capacity(1);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
fb.try_send(
&stream_id,
create_test_request("flow1", &ingestion_config_id),
)
.unwrap();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = write_rx.recv().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
});
fb.send(
&stream_id,
create_test_request("flow2", &ingestion_config_id),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_file_backup_try_send_requests_returns_undelivered_on_full() {
let (mut fb, write_rx) = make_file_backup_with_capacity(1);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
fb.try_send(
&stream_id,
create_test_request("flow0", &ingestion_config_id),
)
.unwrap();
let reqs = vec![
create_test_request("flow1", &ingestion_config_id),
create_test_request("flow2", &ingestion_config_id),
create_test_request("flow3", &ingestion_config_id),
];
let err = fb.try_send_requests(&stream_id, reqs).unwrap_err();
assert!(err.is_full(), "expected Full, got {err}");
assert_eq!(err.into_inner().len(), 3);
drop(write_rx);
}
#[tokio::test]
async fn test_file_backup_send_requests_returns_undelivered_on_closed() {
let (mut fb, write_rx) = make_file_backup_with_capacity(10);
let stream_id = Uuid::new_v4();
let ingestion_config_id = Uuid::new_v4().to_string();
drop(write_rx);
let reqs = vec![
create_test_request("flow1", &ingestion_config_id),
create_test_request("flow2", &ingestion_config_id),
create_test_request("flow3", &ingestion_config_id),
];
let err = fb.send_requests(&stream_id, reqs).await.unwrap_err();
assert_eq!(err.into_inner().len(), 3);
}
}