use crate::connection::ConnectionManager;
use crate::entity::Entity;
use crate::store::SharedStore;
use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update};
use futures_util::Stream;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub struct ViewHandle<T> {
connection: ConnectionManager,
store: SharedStore,
view_path: String,
initial_data_timeout: Duration,
_marker: PhantomData<T>,
}
impl<T> ViewHandle<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
pub async fn get(&self) -> Vec<T> {
self.connection
.ensure_subscription(&self.view_path, None)
.await;
self.store
.wait_for_view_ready(&self.view_path, self.initial_data_timeout)
.await;
self.store.list::<T>(&self.view_path).await
}
pub fn watch(&self) -> WatchBuilder<T>
where
T: Unpin,
{
WatchBuilder::new(
self.connection.clone(),
self.store.clone(),
self.view_path.clone(),
KeyFilter::None,
)
}
pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
where
T: Unpin,
{
WatchBuilder::new(
self.connection.clone(),
self.store.clone(),
self.view_path.clone(),
KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
)
}
}
pub struct WatchBuilder<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
{
connection: ConnectionManager,
store: SharedStore,
view_path: String,
key_filter: KeyFilter,
take: Option<u32>,
skip: Option<u32>,
stream: Option<EntityStream<T>>,
}
impl<T> WatchBuilder<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
{
fn new(
connection: ConnectionManager,
store: SharedStore,
view_path: String,
key_filter: KeyFilter,
) -> Self {
Self {
connection,
store,
view_path,
key_filter,
take: None,
skip: None,
stream: None,
}
}
pub fn take(mut self, n: u32) -> Self {
self.take = Some(n);
self
}
pub fn skip(mut self, n: u32) -> Self {
self.skip = Some(n);
self
}
pub fn rich(self) -> RichEntityStream<T> {
RichEntityStream::new_lazy_with_opts(
self.connection,
self.store,
self.view_path.clone(),
self.view_path,
self.key_filter,
None,
self.take,
self.skip,
)
}
}
impl<T> Stream for WatchBuilder<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
{
type Item = Update<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.stream.is_none() {
this.stream = Some(EntityStream::new_lazy_with_opts(
this.connection.clone(),
this.store.clone(),
this.view_path.clone(),
this.view_path.clone(),
this.key_filter.clone(),
None,
this.take,
this.skip,
));
}
Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
}
}
pub struct ViewBuilder {
connection: ConnectionManager,
store: SharedStore,
initial_data_timeout: Duration,
}
impl ViewBuilder {
pub fn new(
connection: ConnectionManager,
store: SharedStore,
initial_data_timeout: Duration,
) -> Self {
Self {
connection,
store,
initial_data_timeout,
}
}
pub fn connection(&self) -> &ConnectionManager {
&self.connection
}
pub fn store(&self) -> &SharedStore {
&self.store
}
pub fn initial_data_timeout(&self) -> Duration {
self.initial_data_timeout
}
pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
ViewHandle {
connection: self.connection.clone(),
store: self.store.clone(),
view_path: view_path.to_string(),
initial_data_timeout: self.initial_data_timeout,
_marker: PhantomData,
}
}
}
pub trait Views: Sized {
type Entity: Entity;
fn from_builder(builder: ViewBuilder) -> Self;
}
pub struct StateView<T> {
connection: ConnectionManager,
store: SharedStore,
view_path: String,
initial_data_timeout: Duration,
_marker: PhantomData<T>,
}
impl<T> StateView<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
pub fn new(
connection: ConnectionManager,
store: SharedStore,
view_path: String,
initial_data_timeout: Duration,
) -> Self {
Self {
connection,
store,
view_path,
initial_data_timeout,
_marker: PhantomData,
}
}
pub async fn get(&self, key: &str) -> Option<T> {
self.connection
.ensure_subscription(&self.view_path, Some(key))
.await;
self.store
.wait_for_view_ready(&self.view_path, self.initial_data_timeout)
.await;
self.store.get::<T>(&self.view_path, key).await
}
pub fn watch(&self, key: &str) -> EntityStream<T> {
EntityStream::new_lazy(
self.connection.clone(),
self.store.clone(),
self.view_path.clone(),
self.view_path.clone(),
KeyFilter::Single(key.to_string()),
Some(key.to_string()),
)
}
pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
RichEntityStream::new_lazy(
self.connection.clone(),
self.store.clone(),
self.view_path.clone(),
self.view_path.clone(),
KeyFilter::Single(key.to_string()),
Some(key.to_string()),
)
}
}