use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use serde::de::DeserializeOwned;
use surrealdb::types::{Action as CoreAction, Value};
pub use uuid::Uuid;
use crate::SomniaError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Action {
Create,
Update,
Delete,
}
#[derive(Debug, Clone)]
pub struct Notification<T> {
pub query_id: Uuid,
pub action: Action,
pub data: T,
}
#[must_use = "live-query streams are inert until polled; dropping one issues KILL"]
pub struct LiveQueryStream<T> {
inner: surrealdb::Stream<Vec<Value>>,
_marker: PhantomData<fn() -> T>,
}
impl<T> LiveQueryStream<T> {
pub(crate) fn new(inner: surrealdb::Stream<Vec<Value>>) -> Self {
Self {
inner,
_marker: PhantomData,
}
}
}
impl<T: DeserializeOwned + Unpin> Stream for LiveQueryStream<T> {
type Item = Result<Notification<T>, SomniaError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(SomniaError::Connection(e.to_string()))))
}
Poll::Ready(Some(Ok(note))) => {
let action = match note.action {
CoreAction::Create => Action::Create,
CoreAction::Update => Action::Update,
CoreAction::Delete => Action::Delete,
CoreAction::Killed => return Poll::Ready(None),
_ => {
let msg = match note.data.into_json_value() {
serde_json::Value::String(s) => s,
other => other.to_string(),
};
return Poll::Ready(Some(Err(SomniaError::Query(msg))));
}
};
let query_id = note.query_id.into_inner();
match serde_json::from_value::<T>(note.data.into_json_value()) {
Ok(data) => Poll::Ready(Some(Ok(Notification {
query_id,
action,
data,
}))),
Err(e) => Poll::Ready(Some(Err(SomniaError::Deser(e.to_string())))),
}
}
}
}
}