use sift_rs::{
common::r#type::v1::ChannelDataType,
ingest::v1::{
IngestArbitraryProtobufDataStreamRequest, IngestArbitraryProtobufDataStreamResponse,
IngestWithConfigDataStreamRequest, IngestWithConfigDataStreamResponse,
},
ingestion_configs::v2::{ChannelConfig, FlowConfig},
};
use sift_stream::{
ChannelValue, Flow, IngestionConfigForm, SiftStreamBuilder, TimeValue, stream::run::RunSelector,
};
use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
mod common;
use common::prelude::*;
fn standard_flows() -> Vec<FlowConfig> {
vec![FlowConfig {
name: "flow-0".to_string(),
channels: vec![ChannelConfig {
name: "generator".to_string(),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
}]
}
fn standard_ingestion_config() -> IngestionConfigForm {
IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_live_only_client_key".to_string(),
flows: standard_flows(),
}
}
struct CountingIngestService {
num_received: Arc<AtomicU32>,
}
#[async_trait]
impl IngestService for CountingIngestService {
async fn ingest_with_config_data_stream(
&self,
request: Request<Streaming<IngestWithConfigDataStreamRequest>>,
) -> Result<Response<IngestWithConfigDataStreamResponse>, Status> {
let mut stream = request.into_inner();
while let Ok(Some(_)) = stream.try_next().await {
self.num_received.fetch_add(1, Ordering::Relaxed);
}
Ok(Response::new(IngestWithConfigDataStreamResponse {}))
}
async fn ingest_arbitrary_protobuf_data_stream(
&self,
_request: Request<Streaming<IngestArbitraryProtobufDataStreamRequest>>,
) -> Result<Response<IngestArbitraryProtobufDataStreamResponse>, Status> {
unimplemented!()
}
}
struct RunIdCapturingIngestService {
captured_run_ids: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl IngestService for RunIdCapturingIngestService {
async fn ingest_with_config_data_stream(
&self,
request: Request<Streaming<IngestWithConfigDataStreamRequest>>,
) -> Result<Response<IngestWithConfigDataStreamResponse>, Status> {
let mut stream = request.into_inner();
while let Ok(Some(msg)) = stream.try_next().await {
self.captured_run_ids.lock().await.push(msg.run_id.clone());
}
Ok(Response::new(IngestWithConfigDataStreamResponse {}))
}
async fn ingest_arbitrary_protobuf_data_stream(
&self,
_request: Request<Streaming<IngestArbitraryProtobufDataStreamRequest>>,
) -> Result<Response<IngestArbitraryProtobufDataStreamResponse>, Status> {
unimplemented!()
}
}
#[tokio::test]
async fn test_live_only_basic_send() {
let num_received = Arc::new(AtomicU32::default());
let (client, server) = common::start_test_ingest_server(CountingIngestService {
num_received: num_received.clone(),
})
.await;
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(standard_ingestion_config())
.live_only()
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build live-only stream");
let num_messages = 50_u32;
for i in 0..num_messages {
stream
.send(Flow::new(
"flow-0",
TimeValue::default(),
&[ChannelValue::new("generator", f64::from(i))],
))
.await
.expect("send failed");
}
stream.finish().await.expect("finish failed");
assert_eq!(
num_received.load(Ordering::Relaxed),
num_messages,
"server must receive exactly the number of messages sent"
);
assert!(server.await.is_ok(), "test server should shut down cleanly");
}
#[tokio::test]
async fn test_live_only_send_batch_requests() {
let num_received = Arc::new(AtomicU32::default());
let (client, server) = common::start_test_ingest_server(CountingIngestService {
num_received: num_received.clone(),
})
.await;
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(standard_ingestion_config())
.live_only()
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build live-only stream");
let num_messages = 50_u32;
let requests = (0..num_messages).map(|i| IngestWithConfigDataStreamRequest {
ingestion_config_id: "any".to_string(),
flow: "flow-0".to_string(),
timestamp: Some(pbjson_types::Timestamp::default()),
channel_values: vec![sift_rs::ingest::v1::IngestWithConfigDataChannelValue {
r#type: Some(
sift_rs::ingest::v1::ingest_with_config_data_channel_value::Type::Double(
f64::from(i),
),
),
}],
..Default::default()
});
stream
.send_requests(requests)
.await
.expect("send_requests failed");
stream.finish().await.expect("finish failed");
assert_eq!(num_received.load(Ordering::Relaxed), num_messages);
assert!(server.await.is_ok());
}
#[tokio::test]
async fn test_live_only_run_id_propagated_from_builder() {
let captured_run_ids = Arc::new(Mutex::new(Vec::<String>::new()));
let (client, server) = common::start_test_ingest_server(RunIdCapturingIngestService {
captured_run_ids: captured_run_ids.clone(),
})
.await;
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(standard_ingestion_config())
.attach_run_id("any-run-id")
.live_only()
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build live-only stream");
stream
.send(Flow::new(
"flow-0",
TimeValue::default(),
&[ChannelValue::new("generator", 1.0_f64)],
))
.await
.expect("send failed");
stream.finish().await.expect("finish failed");
let ids = captured_run_ids.lock().await;
assert!(
!ids.is_empty(),
"at least one message should have been received"
);
assert!(
ids.iter().all(|id| id == "123"),
"all messages must carry the run_id returned by the mock server, got: {ids:?}"
);
assert!(server.await.is_ok());
}
#[tokio::test]
async fn test_live_only_detach_run_clears_run_id() {
let captured_run_ids = Arc::new(Mutex::new(Vec::<String>::new()));
let (client, server) = common::start_test_ingest_server(RunIdCapturingIngestService {
captured_run_ids: captured_run_ids.clone(),
})
.await;
let mut stream = SiftStreamBuilder::from_channel(client)
.ingestion_config(standard_ingestion_config())
.live_only()
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build live-only stream");
stream
.attach_run(RunSelector::ByForm(sift_stream::RunForm {
name: "test_run".to_string(),
client_key: "test_run_key".to_string(),
..Default::default()
}))
.await
.expect("attach_run failed");
stream
.send(Flow::new(
"flow-0",
TimeValue::default(),
&[ChannelValue::new("generator", 1.0_f64)],
))
.await
.expect("send failed");
stream.detach_run();
stream
.send(Flow::new(
"flow-0",
TimeValue::default(),
&[ChannelValue::new("generator", 2.0_f64)],
))
.await
.expect("send after detach failed");
stream.finish().await.expect("finish failed");
let ids = captured_run_ids.lock().await;
assert_eq!(ids.len(), 2, "expected exactly 2 messages");
assert_eq!(ids[0], "123", "first message must carry run_id from mock");
assert_eq!(
ids[1], "",
"second message must have empty run_id after detach"
);
assert!(server.await.is_ok());
}