use crate::BoxStream;
use crate::subscription::{SubscriptionId, SubscriptionSource};
use futures::StreamExt;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct MockSource<T: Clone> {
sender: broadcast::Sender<T>,
id: SubscriptionId,
}
impl<T: Clone + 'static> MockSource<T> {
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("System time before UNIX_EPOCH")
.as_nanos()
.hash(&mut hasher);
std::any::type_name::<T>().hash(&mut hasher);
let (tx, _rx) = broadcast::channel(capacity);
Self {
sender: tx,
id: SubscriptionId::of::<Self>(hasher.finish()),
}
}
#[must_use]
pub fn new() -> Self {
Self::with_capacity(100)
}
pub fn emit(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
self.sender.send(value)
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl<T: Clone + 'static> Default for MockSource<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + Send + 'static> SubscriptionSource for MockSource<T> {
type Output = T;
fn stream(&self) -> BoxStream<'static, Self::Output> {
let rx = self.sender.subscribe();
Box::pin(
tokio_stream::wrappers::BroadcastStream::new(rx)
.filter_map(|result| async move { result.ok() }),
)
}
fn id(&self) -> SubscriptionId {
self.id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::subscription::Subscription;
use futures::StreamExt;
#[test]
fn test_mock_source_creation() {
let mock = MockSource::<i32>::new();
assert_eq!(mock.receiver_count(), 0);
}
#[test]
fn test_emit() {
let mock = MockSource::<i32>::new();
assert!(mock.emit(42).is_err());
let _rx = mock.sender.subscribe();
assert_eq!(mock.receiver_count(), 1);
assert_eq!(mock.emit(42).expect("should emit to receiver"), 1);
assert_eq!(mock.emit(100).expect("should emit to receiver"), 1);
}
#[test]
fn test_clone() {
let mock1 = MockSource::<i32>::new();
let mock2 = mock1.clone();
let _rx = mock1.sender.subscribe();
assert_eq!(mock2.receiver_count(), 1);
}
#[tokio::test]
async fn test_stream_receives_values() {
let mock = MockSource::<i32>::new();
let sub = Subscription::new(mock.clone());
let mut stream = (sub.spawn)();
mock.emit(1).expect("should emit to stream");
mock.emit(2).expect("should emit to stream");
mock.emit(3).expect("should emit to stream");
let mut values = Vec::new();
for _ in 0..3 {
if let Some(value) = stream.next().await {
values.push(value);
}
}
assert_eq!(values, vec![1, 2, 3]);
}
}