Skip to main content

ai_store_sync/
lib.rs

1#![warn(missing_docs)]
2
3//! # ai-store-sync
4//!
5//! Blocking (synchronous) facade over [`ai_store_core::Store`].
6//!
7//! ## Why
8//!
9//! `Store` is async by design; the SPI traits (`EventBackend`, `CacheBackend`,
10//! `ProjectionSink`) are `async_trait`. Consumers embedded in an otherwise
11//! synchronous codebase would otherwise have to spin up a `tokio::Runtime` and
12//! wrap every call in `runtime.block_on(...)`, and get the details right
13//! (current-thread vs multi-thread, runtime lifetime, `Send` bounds).
14//!
15//! [`BlockingStore`] does that once, in-tree.
16//!
17//! ## Choosing a constructor
18//!
19//! - [`BlockingStore::new`] — owns a dedicated `current_thread` runtime. Use
20//!   this from a plain synchronous `main` / thread when the caller has no
21//!   tokio runtime of its own. Analogous to `reqwest::blocking::Client::new`.
22//! - [`BlockingStore::with_handle`] — borrows an existing [`tokio::runtime::Handle`].
23//!   Use this when the surrounding process already runs a runtime (e.g. a
24//!   library that hosts tokio internally and hands a `Handle` down to sync
25//!   plugin code).
26//!
27//! ## Nested-runtime pitfall
28//!
29//! Do **not** call a `BlockingStore` method from inside an async task on the
30//! same tokio runtime — that would attempt to `block_on` from within a runtime
31//! worker and will panic. If you must bridge from async code:
32//!
33//! - prefer calling `Store` directly with `.await`, or
34//! - wrap the blocking call in
35//!   [`tokio::task::block_in_place`](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html)
36//!   on a multi-thread runtime.
37//!
38//! ## Errors
39//!
40//! All methods return [`StoreError`] verbatim from the async facade. No new
41//! error variants are introduced.
42
43mod sink;
44
45pub use sink::{BlockingSink, Dispatch, SyncProjectionSink};
46
47use std::sync::Arc;
48
49use ai_store_core::{Committed, Event, Label, Patch, Seq, Store, StoreError, StreamId, Timestamp};
50use serde_json::Value;
51use tokio::runtime::{Handle, Runtime};
52
53/// How the blocking facade drives async calls.
54enum Driver {
55    /// Owned `current_thread` runtime, created by [`BlockingStore::new`].
56    Owned(Runtime),
57    /// Borrowed handle from a runtime the caller already runs.
58    Borrowed(Handle),
59}
60
61impl Driver {
62    fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
63        match self {
64            Driver::Owned(rt) => rt.block_on(fut),
65            Driver::Borrowed(h) => h.block_on(fut),
66        }
67    }
68}
69
70/// Synchronous mirror of [`Store`].
71///
72/// Cheap to clone: only the inner `Store` handle is `Arc`-shared. The runtime
73/// driver is shared through `Arc` as well, so cloned handles all drive the
74/// same runtime.
75pub struct BlockingStore {
76    inner: Store,
77    driver: Arc<Driver>,
78}
79
80impl Clone for BlockingStore {
81    fn clone(&self) -> Self {
82        Self {
83            inner: self.inner.clone(),
84            driver: Arc::clone(&self.driver),
85        }
86    }
87}
88
89impl BlockingStore {
90    /// Build a `BlockingStore` that owns a dedicated `current_thread` tokio
91    /// runtime.
92    ///
93    /// Fails only if the runtime cannot be constructed (rare — typically an
94    /// exhausted file descriptor budget). See the crate-level docs for the
95    /// nested-runtime pitfall.
96    pub fn new(store: Store) -> std::io::Result<Self> {
97        let rt = tokio::runtime::Builder::new_current_thread()
98            .enable_all()
99            .build()?;
100        Ok(Self {
101            inner: store,
102            driver: Arc::new(Driver::Owned(rt)),
103        })
104    }
105
106    /// Build a `BlockingStore` that drives calls on the caller's runtime,
107    /// identified by a [`Handle`].
108    ///
109    /// Prefer this constructor when the surrounding process already owns a
110    /// tokio runtime — reusing the handle avoids spawning a second one.
111    pub fn with_handle(store: Store, handle: Handle) -> Self {
112        Self {
113            inner: store,
114            driver: Arc::new(Driver::Borrowed(handle)),
115        }
116    }
117
118    /// Access the underlying async [`Store`]. Useful when a caller needs to
119    /// mix async and blocking paths, or hand the async handle to a
120    /// concurrently-running task.
121    pub fn as_async(&self) -> &Store {
122        &self.inner
123    }
124
125    /// See [`Store::append`].
126    pub fn append(
127        &self,
128        stream: &StreamId,
129        kind: &str,
130        patch: Patch,
131        meta: Value,
132    ) -> Result<Committed, StoreError> {
133        self.driver
134            .block_on(self.inner.append(stream, kind, patch, meta))
135    }
136
137    /// See [`Store::import_event`].
138    pub fn import_event(
139        &self,
140        stream: &StreamId,
141        kind: &str,
142        patch: Patch,
143        meta: Value,
144        at: Timestamp,
145    ) -> Result<Committed, StoreError> {
146        self.driver
147            .block_on(self.inner.import_event(stream, kind, patch, meta, at))
148    }
149
150    /// See [`Store::state`].
151    pub fn state(&self, stream: &StreamId) -> Result<Value, StoreError> {
152        self.driver.block_on(self.inner.state(stream))
153    }
154
155    /// See [`Store::state_at`].
156    pub fn state_at(&self, stream: &StreamId, at: Seq) -> Result<Value, StoreError> {
157        self.driver.block_on(self.inner.state_at(stream, at))
158    }
159
160    /// See [`Store::revert`].
161    pub fn revert(&self, stream: &StreamId, to: Seq) -> Result<Committed, StoreError> {
162        self.driver.block_on(self.inner.revert(stream, to))
163    }
164
165    /// See [`Store::revert_with_meta`].
166    pub fn revert_with_meta(
167        &self,
168        stream: &StreamId,
169        to: Seq,
170        extra_meta: Value,
171    ) -> Result<Committed, StoreError> {
172        self.driver
173            .block_on(self.inner.revert_with_meta(stream, to, extra_meta))
174    }
175
176    /// See [`Store::read`].
177    pub fn read(
178        &self,
179        stream: &StreamId,
180        from: Seq,
181        limit: usize,
182    ) -> Result<Vec<Event>, StoreError> {
183        self.driver.block_on(self.inner.read(stream, from, limit))
184    }
185
186    /// See [`Store::read_by_meta`].
187    pub fn read_by_meta(
188        &self,
189        stream: &StreamId,
190        field: &str,
191        value: &Value,
192        from: Seq,
193        limit: usize,
194    ) -> Result<Vec<Event>, StoreError> {
195        self.driver
196            .block_on(self.inner.read_by_meta(stream, field, value, from, limit))
197    }
198
199    /// See [`Store::head`].
200    pub fn head(&self, stream: &StreamId) -> Result<Option<Seq>, StoreError> {
201        self.driver.block_on(self.inner.head(stream))
202    }
203
204    /// See [`Store::seq_at_time`].
205    pub fn seq_at_time(&self, stream: &StreamId, at: Timestamp) -> Result<Option<Seq>, StoreError> {
206        self.driver.block_on(self.inner.seq_at_time(stream, at))
207    }
208
209    /// See [`Store::streams`].
210    pub fn streams(&self) -> Result<Vec<StreamId>, StoreError> {
211        self.driver.block_on(self.inner.streams())
212    }
213
214    /// See [`Store::label_set`].
215    pub fn label_set(&self, stream: &StreamId, label: &Label, at: Seq) -> Result<(), StoreError> {
216        self.driver
217            .block_on(self.inner.label_set(stream, label, at))
218    }
219
220    /// See [`Store::label_resolve`].
221    pub fn label_resolve(&self, stream: &StreamId, label: &Label) -> Result<Seq, StoreError> {
222        self.driver
223            .block_on(self.inner.label_resolve(stream, label))
224    }
225
226    /// See [`Store::labels`].
227    pub fn labels(&self, stream: &StreamId) -> Result<Vec<(Label, Seq)>, StoreError> {
228        self.driver.block_on(self.inner.labels(stream))
229    }
230
231    /// See [`Store::label_delete`].
232    pub fn label_delete(&self, stream: &StreamId, label: &Label) -> Result<bool, StoreError> {
233        self.driver.block_on(self.inner.label_delete(stream, label))
234    }
235
236    /// See [`Store::materialize_to_sink`].
237    pub fn materialize_to_sink(
238        &self,
239        stream: &StreamId,
240        sink_id: &str,
241        at: Option<Seq>,
242    ) -> Result<Seq, StoreError> {
243        self.driver
244            .block_on(self.inner.materialize_to_sink(stream, sink_id, at))
245    }
246
247    /// See [`Store::catch_up`].
248    pub fn catch_up(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
249        self.driver.block_on(self.inner.catch_up(sink_id))
250    }
251
252    /// See [`Store::rebuild`].
253    pub fn rebuild(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
254        self.driver.block_on(self.inner.rebuild(sink_id))
255    }
256}