ai_store_sync/lib.rs
1#![warn(missing_docs)]
2
3//! # ai-store-sync
4//!
5//! Blocking (synchronous) facade over [`ai_store_core::Store`].
6//!
7//! ## Why
8//!
9//! `Store` is async by design; the SPI traits (`EventBackend`, `CacheBackend`,
10//! `ProjectionSink`) are `async_trait`. Consumers embedded in an otherwise
11//! synchronous codebase would otherwise have to spin up a `tokio::Runtime` and
12//! wrap every call in `runtime.block_on(...)`, and get the details right
13//! (current-thread vs multi-thread, runtime lifetime, `Send` bounds).
14//!
15//! [`BlockingStore`] does that once, in-tree.
16//!
17//! ## Choosing a constructor
18//!
19//! - [`BlockingStore::new`] — owns a dedicated `current_thread` runtime. Use
20//! this from a plain synchronous `main` / thread when the caller has no
21//! tokio runtime of its own. Analogous to `reqwest::blocking::Client::new`.
22//! - [`BlockingStore::with_handle`] — borrows an existing [`tokio::runtime::Handle`].
23//! Use this when the surrounding process already runs a runtime (e.g. a
24//! library that hosts tokio internally and hands a `Handle` down to sync
25//! plugin code).
26//!
27//! ## Nested-runtime pitfall
28//!
29//! Do **not** call a `BlockingStore` method from inside an async task on the
30//! same tokio runtime — that would attempt to `block_on` from within a runtime
31//! worker and will panic. If you must bridge from async code:
32//!
33//! - prefer calling `Store` directly with `.await`, or
34//! - wrap the blocking call in
35//! [`tokio::task::block_in_place`](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html)
36//! on a multi-thread runtime.
37//!
38//! ## Errors
39//!
40//! All methods return [`StoreError`] verbatim from the async facade. No new
41//! error variants are introduced.
42
43mod sink;
44
45pub use sink::{BlockingSink, Dispatch, SyncProjectionSink};
46
47use std::sync::Arc;
48
49use ai_store_core::{Event, Label, Patch, Seq, Store, StoreError, StreamId, Timestamp};
50use serde_json::Value;
51use tokio::runtime::{Handle, Runtime};
52
53/// How the blocking facade drives async calls.
54enum Driver {
55 /// Owned `current_thread` runtime, created by [`BlockingStore::new`].
56 Owned(Runtime),
57 /// Borrowed handle from a runtime the caller already runs.
58 Borrowed(Handle),
59}
60
61impl Driver {
62 fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
63 match self {
64 Driver::Owned(rt) => rt.block_on(fut),
65 Driver::Borrowed(h) => h.block_on(fut),
66 }
67 }
68}
69
70/// Synchronous mirror of [`Store`].
71///
72/// Cheap to clone: only the inner `Store` handle is `Arc`-shared. The runtime
73/// driver is shared through `Arc` as well, so cloned handles all drive the
74/// same runtime.
75pub struct BlockingStore {
76 inner: Store,
77 driver: Arc<Driver>,
78}
79
80impl Clone for BlockingStore {
81 fn clone(&self) -> Self {
82 Self {
83 inner: self.inner.clone(),
84 driver: Arc::clone(&self.driver),
85 }
86 }
87}
88
89impl BlockingStore {
90 /// Build a `BlockingStore` that owns a dedicated `current_thread` tokio
91 /// runtime.
92 ///
93 /// Fails only if the runtime cannot be constructed (rare — typically an
94 /// exhausted file descriptor budget). See the crate-level docs for the
95 /// nested-runtime pitfall.
96 pub fn new(store: Store) -> std::io::Result<Self> {
97 let rt = tokio::runtime::Builder::new_current_thread()
98 .enable_all()
99 .build()?;
100 Ok(Self {
101 inner: store,
102 driver: Arc::new(Driver::Owned(rt)),
103 })
104 }
105
106 /// Build a `BlockingStore` that drives calls on the caller's runtime,
107 /// identified by a [`Handle`].
108 ///
109 /// Prefer this constructor when the surrounding process already owns a
110 /// tokio runtime — reusing the handle avoids spawning a second one.
111 pub fn with_handle(store: Store, handle: Handle) -> Self {
112 Self {
113 inner: store,
114 driver: Arc::new(Driver::Borrowed(handle)),
115 }
116 }
117
118 /// Access the underlying async [`Store`]. Useful when a caller needs to
119 /// mix async and blocking paths, or hand the async handle to a
120 /// concurrently-running task.
121 pub fn as_async(&self) -> &Store {
122 &self.inner
123 }
124
125 /// See [`Store::append`].
126 pub fn append(
127 &self,
128 stream: &StreamId,
129 kind: &str,
130 patch: Patch,
131 meta: Value,
132 ) -> Result<Seq, StoreError> {
133 self.driver
134 .block_on(self.inner.append(stream, kind, patch, meta))
135 }
136
137 /// See [`Store::state`].
138 pub fn state(&self, stream: &StreamId) -> Result<Value, StoreError> {
139 self.driver.block_on(self.inner.state(stream))
140 }
141
142 /// See [`Store::state_at`].
143 pub fn state_at(&self, stream: &StreamId, at: Seq) -> Result<Value, StoreError> {
144 self.driver.block_on(self.inner.state_at(stream, at))
145 }
146
147 /// See [`Store::revert`].
148 pub fn revert(&self, stream: &StreamId, to: Seq) -> Result<Seq, StoreError> {
149 self.driver.block_on(self.inner.revert(stream, to))
150 }
151
152 /// See [`Store::read`].
153 pub fn read(
154 &self,
155 stream: &StreamId,
156 from: Seq,
157 limit: usize,
158 ) -> Result<Vec<Event>, StoreError> {
159 self.driver.block_on(self.inner.read(stream, from, limit))
160 }
161
162 /// See [`Store::read_by_meta`].
163 pub fn read_by_meta(
164 &self,
165 stream: &StreamId,
166 field: &str,
167 value: &Value,
168 from: Seq,
169 limit: usize,
170 ) -> Result<Vec<Event>, StoreError> {
171 self.driver
172 .block_on(self.inner.read_by_meta(stream, field, value, from, limit))
173 }
174
175 /// See [`Store::head`].
176 pub fn head(&self, stream: &StreamId) -> Result<Option<Seq>, StoreError> {
177 self.driver.block_on(self.inner.head(stream))
178 }
179
180 /// See [`Store::seq_at_time`].
181 pub fn seq_at_time(&self, stream: &StreamId, at: Timestamp) -> Result<Option<Seq>, StoreError> {
182 self.driver.block_on(self.inner.seq_at_time(stream, at))
183 }
184
185 /// See [`Store::streams`].
186 pub fn streams(&self) -> Result<Vec<StreamId>, StoreError> {
187 self.driver.block_on(self.inner.streams())
188 }
189
190 /// See [`Store::label_set`].
191 pub fn label_set(&self, stream: &StreamId, label: &Label, at: Seq) -> Result<(), StoreError> {
192 self.driver
193 .block_on(self.inner.label_set(stream, label, at))
194 }
195
196 /// See [`Store::label_resolve`].
197 pub fn label_resolve(&self, stream: &StreamId, label: &Label) -> Result<Seq, StoreError> {
198 self.driver
199 .block_on(self.inner.label_resolve(stream, label))
200 }
201
202 /// See [`Store::labels`].
203 pub fn labels(&self, stream: &StreamId) -> Result<Vec<(Label, Seq)>, StoreError> {
204 self.driver.block_on(self.inner.labels(stream))
205 }
206
207 /// See [`Store::catch_up`].
208 pub fn catch_up(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
209 self.driver.block_on(self.inner.catch_up(sink_id))
210 }
211
212 /// See [`Store::rebuild`].
213 pub fn rebuild(&self, sink_id: &str) -> Result<ai_store_core::CatchUpReport, StoreError> {
214 self.driver.block_on(self.inner.rebuild(sink_id))
215 }
216}