use std::sync::Arc;
use std::time::Duration;
use ruststream::OutgoingMessage;
use ruststream::conformance::harness;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use ruststream::testing::TestableBroker;
use ruststream_fred::{RedisList, RedisPubSub, RedisStream, testing::RedisTestBroker};
use serde::Deserialize;
use tokio::sync::Mutex;
#[derive(Debug, Deserialize, Clone, PartialEq)]
struct Payment {
id: u64,
user_id: u64,
amount: u64,
}
#[derive(Clone, Default)]
struct PaymentRepository {
payments: Arc<Mutex<Vec<Payment>>>,
}
impl PaymentRepository {
async fn save(&self, payment: Payment) {
self.payments.lock().await.push(payment);
}
async fn count(&self) -> usize {
self.payments.lock().await.len()
}
async fn contains(&self, id: u64) -> bool {
self.payments.lock().await.iter().any(|p| p.id == id)
}
}
#[subscriber(
RedisStream::new("payments")
.group("workers")
)]
async fn process_payment(
payment: &Payment,
ctx: &mut Context<'_, (), PaymentRepository>,
) -> HandlerResult {
if payment.amount == 0 {
return HandlerResult::drop();
}
ctx.state().save(payment.clone()).await;
HandlerResult::ack()
}
#[subscriber(
RedisStream::new("events")
.group("workers")
)]
async fn handle_stream_event(payment: &Payment) -> HandlerResult {
println!("stream event {}", payment.id);
HandlerResult::Ack
}
#[subscriber(
RedisList::new("jobs")
.reliable()
)]
async fn handle_list_job(payment: &Payment) -> HandlerResult {
println!("list job {}", payment.id);
HandlerResult::Ack
}
#[subscriber(RedisPubSub::new("notifications"))]
async fn handle_pubsub_notification(payment: &Payment) -> HandlerResult {
println!("pubsub notification {}", payment.id);
HandlerResult::Ack
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
test_payment_processing().await?;
test_stream_delivery().await?;
test_list_delivery().await?;
test_pubsub_delivery().await?;
test_conformance_suite().await?;
Ok(())
}
async fn test_payment_processing() -> Result<(), Box<dyn std::error::Error>> {
let broker = RedisTestBroker::new();
let repository = PaymentRepository::default();
let repository_for_app = repository.clone();
let app = RustStream::new(AppInfo::new("test", "0.1.0"))
.on_startup(move |()| async move { Ok::<_, std::convert::Infallible>(repository_for_app) })
.with_broker(broker.clone(), |b| {
b.include(process_payment);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
broker.inject(OutgoingMessage::new(
"payments",
br#"{"id":1,"user_id":42,"amount":100}"#,
));
broker.inject(OutgoingMessage::new(
"payments",
br#"{"id":2,"user_id":42,"amount":0}"#,
));
let deadline = Duration::from_secs(2);
let start = std::time::Instant::now();
while !repository.contains(1).await && start.elapsed() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(repository.contains(1).await, "valid payment was not saved");
assert!(
!repository.contains(2).await,
"invalid payment should have been dropped"
);
assert_eq!(repository.count().await, 1);
task.await??;
Ok(())
}
async fn test_stream_delivery() -> Result<(), Box<dyn std::error::Error>> {
let broker = RedisTestBroker::new();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker.clone(), |b| {
b.include(handle_stream_event);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
broker.inject(OutgoingMessage::new(
"events",
br#"{"id":1,"user_id":42,"amount":100}"#,
));
task.await??;
Ok(())
}
async fn test_list_delivery() -> Result<(), Box<dyn std::error::Error>> {
let broker = RedisTestBroker::new();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker.clone(), |b| {
b.include(handle_list_job);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
broker.inject(OutgoingMessage::new(
"jobs",
br#"{"id":1,"user_id":42,"amount":100}"#,
));
task.await??;
Ok(())
}
async fn test_pubsub_delivery() -> Result<(), Box<dyn std::error::Error>> {
let broker = RedisTestBroker::new();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker.clone(), |b| {
b.include(handle_pubsub_notification);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
broker.inject(OutgoingMessage::new(
"notifications",
br#"{"id":1,"user_id":42,"amount":100}"#,
));
task.await??;
Ok(())
}
async fn test_conformance_suite() -> Result<(), Box<dyn std::error::Error>> {
harness::run_suite(RedisTestBroker::new).await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn valid_payment_is_saved_and_invalid_is_dropped() {
let broker = RedisTestBroker::new();
let repository = PaymentRepository::default();
let repository_for_app = repository.clone();
let app = RustStream::new(AppInfo::new("test", "0.1.0"))
.on_startup(
move |()| async move { Ok::<_, std::convert::Infallible>(repository_for_app) },
)
.with_broker(broker.clone(), |b| {
b.include(process_payment);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
broker.inject(OutgoingMessage::new(
"payments",
br#"{"id":1,"user_id":42,"amount":100}"#,
));
broker.inject(OutgoingMessage::new(
"payments",
br#"{"id":2,"user_id":42,"amount":0}"#,
));
let deadline = Duration::from_secs(2);
let start = std::time::Instant::now();
while !repository.contains(1).await && start.elapsed() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(repository.contains(1).await);
assert!(!repository.contains(2).await);
assert_eq!(repository.count().await, 1);
task.await.unwrap().unwrap();
}
}