ai_store_sync/sink.rs
1//! Synchronous [`ProjectionSink`] bridge.
2//!
3//! [`SyncProjectionSink`] is the sibling of [`ProjectionSink`] with sync
4//! method signatures. [`BlockingSink`] wraps an implementation of
5//! [`SyncProjectionSink`] and exposes it to the async facade.
6//!
7//! ## Two dispatch modes
8//!
9//! - [`Dispatch::SpawnBlocking`] (default via [`BlockingSink::new`]) hands
10//! each `commit` / `on_label_set` off to `tokio::task::spawn_blocking`. Use
11//! this whenever the sync method may block for more than a few hundred
12//! microseconds — file I/O, `fsync`, database drivers with only sync APIs.
13//! Requires a runtime with a blocking pool (any `tokio::runtime` built via
14//! `Builder::new_multi_thread()` or `Builder::new_current_thread()` with
15//! `enable_all()` has one).
16//!
17//! - [`Dispatch::Inline`] (via [`BlockingSink::inline`]) runs the sync method
18//! directly on the async worker. Correct only for fast in-memory
19//! bookkeeping — anything else stalls the runtime.
20//!
21//! ## Idempotence
22//!
23//! [`SyncProjectionSink`] inherits the same idempotence contract as
24//! [`ProjectionSink`]: replaying the same `(stream, seq)` must produce the
25//! same effect as the first application, because `Store::catch_up` and
26//! `Store::rebuild` may re-drive events after a crash or configuration
27//! change.
28
29use std::sync::Arc;
30
31use ai_store_core::{Event, Label, ProjectionSink, Seq, StoreError, StreamId};
32use async_trait::async_trait;
33use serde_json::Value;
34
35/// Synchronous variant of [`ProjectionSink`]. Implement this when the sink's
36/// body is inherently blocking (file I/O, synchronous database drivers,
37/// stdlib `println!`), then wrap in [`BlockingSink`] to plug into the async
38/// facade.
39pub trait SyncProjectionSink: Send + Sync + 'static {
40 /// Stable identifier used as the checkpoint key. See
41 /// [`ProjectionSink::id`].
42 fn id(&self) -> &str;
43
44 /// Apply one committed event. Must be idempotent under retries.
45 fn commit(
46 &self,
47 stream: &StreamId,
48 seq: Seq,
49 state: &Value,
50 event: &Event,
51 ) -> Result<(), StoreError>;
52
53 /// React to a label being pinned or moved. Default is a no-op, matching
54 /// the async trait's default.
55 fn on_label_set(
56 &self,
57 _stream: &StreamId,
58 _label: &Label,
59 _at: Seq,
60 _state: &Value,
61 ) -> Result<(), StoreError> {
62 Ok(())
63 }
64
65 /// React to a label being deleted. Default is a no-op, matching the
66 /// async trait's default.
67 fn on_label_deleted(&self, _stream: &StreamId, _label: &Label) -> Result<(), StoreError> {
68 Ok(())
69 }
70}
71
72/// How [`BlockingSink`] hands off calls to the wrapped
73/// [`SyncProjectionSink`].
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum Dispatch {
76 /// Run the sync method inline on the async worker.
77 ///
78 /// Safe only when the method is guaranteed to return in a few hundred
79 /// microseconds or less. Anything else blocks the runtime.
80 Inline,
81 /// Offload to `tokio::task::spawn_blocking`.
82 ///
83 /// The default. Correct for any sink that touches the file system,
84 /// synchronous DB drivers, or otherwise blocking code.
85 SpawnBlocking,
86}
87
88/// Adapter turning a [`SyncProjectionSink`] into a [`ProjectionSink`].
89pub struct BlockingSink<T: SyncProjectionSink> {
90 inner: Arc<T>,
91 dispatch: Dispatch,
92}
93
94impl<T: SyncProjectionSink> BlockingSink<T> {
95 /// Wrap `inner` in `spawn_blocking` dispatch — the safe default for any
96 /// sink that may block.
97 pub fn new(inner: T) -> Self {
98 Self {
99 inner: Arc::new(inner),
100 dispatch: Dispatch::SpawnBlocking,
101 }
102 }
103
104 /// Wrap `inner` in inline dispatch. Use only for fast in-memory sinks;
105 /// see [`Dispatch::Inline`].
106 pub fn inline(inner: T) -> Self {
107 Self {
108 inner: Arc::new(inner),
109 dispatch: Dispatch::Inline,
110 }
111 }
112
113 /// Access the wrapped sink. Useful for tests that need to peek at
114 /// accumulated state after the async dispatch loop advanced the
115 /// checkpoint.
116 pub fn inner(&self) -> &Arc<T> {
117 &self.inner
118 }
119}
120
121#[async_trait]
122impl<T: SyncProjectionSink> ProjectionSink for BlockingSink<T> {
123 fn id(&self) -> &str {
124 // `id()` on the trait returns a borrowed `&str` from `T`. Since
125 // `self.inner: Arc<T>` is stable for the lifetime of `&self`, this
126 // borrow is sound.
127 self.inner.id()
128 }
129
130 async fn commit(
131 &self,
132 stream: &StreamId,
133 seq: Seq,
134 state: &Value,
135 event: &Event,
136 ) -> Result<(), StoreError> {
137 match self.dispatch {
138 Dispatch::Inline => self.inner.commit(stream, seq, state, event),
139 Dispatch::SpawnBlocking => {
140 let inner = Arc::clone(&self.inner);
141 let stream = stream.clone();
142 let state = state.clone();
143 let event = event.clone();
144 tokio::task::spawn_blocking(move || inner.commit(&stream, seq, &state, &event))
145 .await
146 .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
147 }
148 }
149 }
150
151 async fn on_label_set(
152 &self,
153 stream: &StreamId,
154 label: &Label,
155 at: Seq,
156 state: &Value,
157 ) -> Result<(), StoreError> {
158 match self.dispatch {
159 Dispatch::Inline => self.inner.on_label_set(stream, label, at, state),
160 Dispatch::SpawnBlocking => {
161 let inner = Arc::clone(&self.inner);
162 let stream = stream.clone();
163 let label = label.clone();
164 let state = state.clone();
165 tokio::task::spawn_blocking(move || inner.on_label_set(&stream, &label, at, &state))
166 .await
167 .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
168 }
169 }
170 }
171
172 async fn on_label_deleted(&self, stream: &StreamId, label: &Label) -> Result<(), StoreError> {
173 match self.dispatch {
174 Dispatch::Inline => self.inner.on_label_deleted(stream, label),
175 Dispatch::SpawnBlocking => {
176 let inner = Arc::clone(&self.inner);
177 let stream = stream.clone();
178 let label = label.clone();
179 tokio::task::spawn_blocking(move || inner.on_label_deleted(&stream, &label))
180 .await
181 .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
182 }
183 }
184 }
185}