use std::{
ops::Deref,
pin::Pin,
};
use futures::{
task,
Stream,
StreamExt,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{
errors::BroadcastStreamRecvError,
BroadcastStream,
};
use crate::{
base_client::{
FunctionResult,
QueryResults,
SubscriberId,
},
client::worker::{
ClientRequest,
UnsubscribeRequest,
},
};
#[cfg(doc)]
use crate::{
ConvexClient,
Value,
};
pub struct QuerySubscription {
pub(super) subscriber_id: SubscriberId,
pub(super) request_sender: mpsc::UnboundedSender<ClientRequest>,
pub(super) watch: BroadcastStream<QueryResults>,
pub(super) initial: Option<FunctionResult>,
}
impl QuerySubscription {
pub fn id(&self) -> &SubscriberId {
&self.subscriber_id
}
}
impl std::fmt::Debug for QuerySubscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QuerySubscription")
.field("subscriber_id", &self.subscriber_id)
.finish()
}
}
impl Deref for QuerySubscription {
type Target = SubscriberId;
fn deref(&self) -> &SubscriberId {
&self.subscriber_id
}
}
impl Drop for QuerySubscription {
fn drop(&mut self) {
let _ = self
.request_sender
.send(ClientRequest::Unsubscribe(UnsubscribeRequest {
subscriber_id: self.subscriber_id,
}));
}
}
impl Stream for QuerySubscription {
type Item = FunctionResult;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
if let Some(initial) = self.initial.take() {
return task::Poll::Ready(Some(initial));
}
loop {
return match self.watch.poll_next_unpin(cx) {
task::Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_amt)))) => continue,
task::Poll::Ready(Some(Ok(map))) => {
let Some(value) = map.get(self.id()) else {
continue;
};
task::Poll::Ready(Some(value.clone()))
},
task::Poll::Ready(None) => task::Poll::Ready(None),
task::Poll::Pending => task::Poll::Pending,
};
}
}
}
pub struct QuerySetSubscription {
watch: BroadcastStream<QueryResults>,
}
impl QuerySetSubscription {
pub(super) fn new(watch: BroadcastStream<QueryResults>) -> Self {
Self { watch }
}
}
impl Stream for QuerySetSubscription {
type Item = QueryResults;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
loop {
return match self.watch.poll_next_unpin(cx) {
task::Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_amt)))) => continue,
task::Poll::Ready(Some(Ok(map))) => task::Poll::Ready(Some(map)),
task::Poll::Ready(None) => task::Poll::Ready(None),
task::Poll::Pending => task::Poll::Pending,
};
}
}
}