use crate::json_deserialize;
use log::error;
use nats::asynk::Subscription;
use serde::de::DeserializeOwned;
use std::time::{Duration, Instant};
#[doc(hidden)]
pub enum SubscriptionNextResult<T: serde::de::DeserializeOwned> {
Item(T),
Timeout,
Cancelled,
Err(String),
}
#[doc(hidden)]
pub struct SubscriptionStream {
sub: Subscription,
}
impl SubscriptionStream {
pub fn new(sub: Subscription) -> SubscriptionStream {
SubscriptionStream { sub }
}
pub async fn next<T: DeserializeOwned>(
&mut self,
timeout: Duration,
) -> SubscriptionNextResult<T> {
match tokio::time::timeout(timeout, self.sub.next()).await {
Err(_) => SubscriptionNextResult::Timeout,
Ok(Some(msg)) => match json_deserialize::<T>(&msg.data) {
Ok(item) => SubscriptionNextResult::Item(item),
Err(e) => SubscriptionNextResult::Err(e.to_string()),
},
Ok(None) => SubscriptionNextResult::Cancelled,
}
}
pub async fn collect<T: DeserializeOwned>(
&mut self,
timeout: Duration,
reason: &str,
) -> Vec<T> {
let start = Instant::now();
let mut items = Vec::new();
loop {
let elapsed = start.elapsed();
if elapsed >= timeout {
break;
}
match self.next(timeout - elapsed).await {
SubscriptionNextResult::Item(item) => items.push(item),
SubscriptionNextResult::Cancelled | SubscriptionNextResult::Timeout => break,
SubscriptionNextResult::Err(s) => {
error!("corrupt message received {}: {}", reason, s);
}
}
}
items
}
}