1#![warn(missing_docs)]
2
3mod sink;
44
45pub use sink::{BlockingSink, Dispatch, SyncProjectionSink};
46
47use std::sync::Arc;
48
49use ai_store_core::{Committed, Event, Label, Patch, Seq, Store, StoreError, StreamId, Timestamp};
50use serde_json::Value;
51use tokio::runtime::{Handle, Runtime};
52
53enum Driver {
55 Owned(Runtime),
57 Borrowed(Handle),
59}
60
61impl Driver {
62 fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
63 match self {
64 Driver::Owned(rt) => rt.block_on(fut),
65 Driver::Borrowed(h) => h.block_on(fut),
66 }
67 }
68}
69
70pub struct BlockingStore {
76 inner: Store,
77 driver: Arc<Driver>,
78}
79
80impl Clone for BlockingStore {
81 fn clone(&self) -> Self {
82 Self {
83 inner: self.inner.clone(),
84 driver: Arc::clone(&self.driver),
85 }
86 }
87}
88
89impl BlockingStore {
90 pub fn new(store: Store) -> std::io::Result<Self> {
97 let rt = tokio::runtime::Builder::new_current_thread()
98 .enable_all()
99 .build()?;
100 Ok(Self {
101 inner: store,
102 driver: Arc::new(Driver::Owned(rt)),
103 })
104 }
105
106 pub fn with_handle(store: Store, handle: Handle) -> Self {
112 Self {
113 inner: store,
114 driver: Arc::new(Driver::Borrowed(handle)),
115 }
116 }
117
118 pub fn as_async(&self) -> &Store {
122 &self.inner
123 }
124
125 pub fn append(
127 &self,
128 stream: &StreamId,
129 kind: &str,
130 patch: Patch,
131 meta: Value,
132 ) -> Result<Committed, StoreError> {
133 self.driver
134 .block_on(self.inner.append(stream, kind, patch, meta))
135 }
136
137 pub fn import_event(
139 &self,
140 stream: &StreamId,
141 kind: &str,
142 patch: Patch,
143 meta: Value,
144 at: Timestamp,
145 ) -> Result<Committed, StoreError> {
146 self.driver
147 .block_on(self.inner.import_event(stream, kind, patch, meta, at))
148 }
149
150 pub fn state(&self, stream: &StreamId) -> Result<Value, StoreError> {
152 self.driver.block_on(self.inner.state(stream))
153 }
154
155 pub fn state_at(&self, stream: &StreamId, at: Seq) -> Result<Value, StoreError> {
157 self.driver.block_on(self.inner.state_at(stream, at))
158 }
159
160 pub fn revert(&self, stream: &StreamId, to: Seq) -> Result<Committed, StoreError> {
162 self.driver.block_on(self.inner.revert(stream, to))
163 }
164
165 pub fn revert_with_meta(
167 &self,
168 stream: &StreamId,
169 to: Seq,
170 extra_meta: Value,
171 ) -> Result<Committed, StoreError> {
172 self.driver
173 .block_on(self.inner.revert_with_meta(stream, to, extra_meta))
174 }
175
176 pub fn read(
178 &self,
179 stream: &StreamId,
180 from: Seq,
181 limit: usize,
182 ) -> Result<Vec<Event>, StoreError> {
183 self.driver.block_on(self.inner.read(stream, from, limit))
184 }
185
186 pub fn read_by_meta(
188 &self,
189 stream: &StreamId,
190 field: &str,
191 value: &Value,
192 from: Seq,
193 limit: usize,
194 ) -> Result<Vec<Event>, StoreError> {
195 self.driver
196 .block_on(self.inner.read_by_meta(stream, field, value, from, limit))
197 }
198
199 pub fn head(&self, stream: &StreamId) -> Result<Option<Seq>, StoreError> {
201 self.driver.block_on(self.inner.head(stream))
202 }
203
204 pub fn seq_at_time(&self, stream: &StreamId, at: Timestamp) -> Result<Option<Seq>, StoreError> {
206 self.driver.block_on(self.inner.seq_at_time(stream, at))
207 }
208
209 pub fn streams(&self) -> Result<Vec<StreamId>, StoreError> {
211 self.driver.block_on(self.inner.streams())
212 }
213
214 pub fn label_set(&self, stream: &StreamId, label: &Label, at: Seq) -> Result<(), StoreError> {
216 self.driver
217 .block_on(self.inner.label_set(stream, label, at))
218 }
219
220 pub fn label_resolve(&self, stream: &StreamId, label: &Label) -> Result<Seq, StoreError> {
222 self.driver
223 .block_on(self.inner.label_resolve(stream, label))
224 }
225
226 pub fn labels(&self, stream: &StreamId) -> Result<Vec<(Label, Seq)>, StoreError> {
228 self.driver.block_on(self.inner.labels(stream))
229 }
230
231 pub fn label_delete(&self, stream: &StreamId, label: &Label) -> Result<bool, StoreError> {
233 self.driver.block_on(self.inner.label_delete(stream, label))
234 }
235
236 pub fn materialize_to_sink(
238 &self,
239 stream: &StreamId,
240 sink_id: &str,
241 at: Option<Seq>,
242 ) -> Result<Seq, StoreError> {
243 self.driver
244 .block_on(self.inner.materialize_to_sink(stream, sink_id, at))
245 }
246
247 pub fn catch_up(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
249 self.driver.block_on(self.inner.catch_up(sink_id))
250 }
251
252 pub fn rebuild(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
254 self.driver.block_on(self.inner.rebuild(sink_id))
255 }
256}