#![warn(missing_docs)]
mod sink;
pub use sink::{BlockingSink, Dispatch, SyncProjectionSink};
use std::sync::Arc;
use ai_store_core::{Event, Label, Patch, Seq, Store, StoreError, StreamId, Timestamp};
use serde_json::Value;
use tokio::runtime::{Handle, Runtime};
enum Driver {
Owned(Runtime),
Borrowed(Handle),
}
impl Driver {
fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
match self {
Driver::Owned(rt) => rt.block_on(fut),
Driver::Borrowed(h) => h.block_on(fut),
}
}
}
pub struct BlockingStore {
inner: Store,
driver: Arc<Driver>,
}
impl Clone for BlockingStore {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
driver: Arc::clone(&self.driver),
}
}
}
impl BlockingStore {
pub fn new(store: Store) -> std::io::Result<Self> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Ok(Self {
inner: store,
driver: Arc::new(Driver::Owned(rt)),
})
}
pub fn with_handle(store: Store, handle: Handle) -> Self {
Self {
inner: store,
driver: Arc::new(Driver::Borrowed(handle)),
}
}
pub fn as_async(&self) -> &Store {
&self.inner
}
pub fn append(
&self,
stream: &StreamId,
kind: &str,
patch: Patch,
meta: Value,
) -> Result<Seq, StoreError> {
self.driver
.block_on(self.inner.append(stream, kind, patch, meta))
}
pub fn state(&self, stream: &StreamId) -> Result<Value, StoreError> {
self.driver.block_on(self.inner.state(stream))
}
pub fn state_at(&self, stream: &StreamId, at: Seq) -> Result<Value, StoreError> {
self.driver.block_on(self.inner.state_at(stream, at))
}
pub fn revert(&self, stream: &StreamId, to: Seq) -> Result<Seq, StoreError> {
self.driver.block_on(self.inner.revert(stream, to))
}
pub fn read(
&self,
stream: &StreamId,
from: Seq,
limit: usize,
) -> Result<Vec<Event>, StoreError> {
self.driver.block_on(self.inner.read(stream, from, limit))
}
pub fn read_by_meta(
&self,
stream: &StreamId,
field: &str,
value: &Value,
from: Seq,
limit: usize,
) -> Result<Vec<Event>, StoreError> {
self.driver
.block_on(self.inner.read_by_meta(stream, field, value, from, limit))
}
pub fn head(&self, stream: &StreamId) -> Result<Option<Seq>, StoreError> {
self.driver.block_on(self.inner.head(stream))
}
pub fn seq_at_time(&self, stream: &StreamId, at: Timestamp) -> Result<Option<Seq>, StoreError> {
self.driver.block_on(self.inner.seq_at_time(stream, at))
}
pub fn streams(&self) -> Result<Vec<StreamId>, StoreError> {
self.driver.block_on(self.inner.streams())
}
pub fn label_set(&self, stream: &StreamId, label: &Label, at: Seq) -> Result<(), StoreError> {
self.driver
.block_on(self.inner.label_set(stream, label, at))
}
pub fn label_resolve(&self, stream: &StreamId, label: &Label) -> Result<Seq, StoreError> {
self.driver
.block_on(self.inner.label_resolve(stream, label))
}
pub fn labels(&self, stream: &StreamId) -> Result<Vec<(Label, Seq)>, StoreError> {
self.driver.block_on(self.inner.labels(stream))
}
pub fn label_delete(&self, stream: &StreamId, label: &Label) -> Result<bool, StoreError> {
self.driver.block_on(self.inner.label_delete(stream, label))
}
pub fn materialize_to_sink(
&self,
stream: &StreamId,
sink_id: &str,
at: Option<Seq>,
) -> Result<Seq, StoreError> {
self.driver
.block_on(self.inner.materialize_to_sink(stream, sink_id, at))
}
pub fn catch_up(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
self.driver.block_on(self.inner.catch_up(sink_id))
}
pub fn rebuild(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
self.driver.block_on(self.inner.rebuild(sink_id))
}
}